diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 8907e25..c7e05e6 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,110 +1,103 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from itertools import islice from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryGraph from .interface import ProvenanceInterface from .model import OriginEntry, RevisionEntry +ORIGIN_DURATION_METRIC = "swh_provenance_origin_revision_layer_duration_seconds" + class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) -@statsd.timed( - metric="swh_provenance_origin_revision_layer_accesstime_seconds", - tags={"method": "main"}, -) +@statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "main"}) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], ) -> None: for origin in origins: provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: graph = HistoryGraph(archive, provenance, revision) origin_add_revision(provenance, origin, graph) provenance.flush() -@statsd.timed( - metric="swh_provenance_origin_revision_layer_accesstime_seconds", - tags={"method": "process_revision"}, -) +@statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_revision"}) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryGraph, ) -> None: # XXX: simplified version of the origin-revision algorithm. This is generating flat # models for the history of all head revisions. No previous result is reused now! # The previous implementation was missing some paths from origins to certain # revisions due to a wrong reuse logic. # head is treated separately since it should always be added to the given origin check_preferred_origin(provenance, origin, graph.head.entry) provenance.revision_add_to_origin(origin, graph.head.entry) visited = {graph.head} # head's history should be recursively iterated starting from its parents stack = list(graph.parents[graph.head]) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current.entry) # create a link between it and the head, and recursively walk its history provenance.revision_add_before_revision(graph.head.entry, current.entry) visited.add(current) for parent in graph.parents[current]: if parent not in visited: stack.append(parent) -@statsd.timed( - metric="swh_provenance_origin_revision_layer_accesstime_seconds", - tags={"method": "check_preferred_origin"}, -) +@statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "check_preferred_origin"}) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index c95cf69..26ed8b4 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,129 +1,124 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2.extensions from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.storage import get_storage +ARCHIVE_DURATION_METRIC = "swh_provenance_archive_direct_duration_seconds" + class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection) -> None: self.storage = get_storage( "postgresql", db=conn.dsn, objstorage={"cls": "memory"} ) self.conn = conn def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: entries = self._directory_ls(id) yield from entries @lru_cache(maxsize=100000) - @statsd.timed( - metric="swh_provenance_archive_direct_accesstime_seconds", - tags={"method": "directory_ls"}, - ) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def _directory_ls(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """ WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor ] @statsd.timed( - metric="swh_provenance_archive_direct_accesstime_seconds", - tags={"method": "revision_get_parents"}, + metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) # There should be at most one row anyway yield from (row[0] for row in cursor) - @statsd.timed( - metric="swh_provenance_archive_direct_accesstime_seconds", - tags={"method": "snapshot_get_heads"}, - ) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) ORDER BY date, id) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py index 64aaa5e..05dfe5f 100644 --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -1,247 +1,240 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry +REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" + class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. The input is an iterator that produces 3 elements per row: (id, date, root) where: - id: is the id (sha1_git) of the revision - date: is the author date - root: sha1 of the directory """ def __init__( self, revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]], limit: Optional[int] = None, ) -> None: self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]] if limit is not None: from itertools import islice self.revisions = islice(revisions, limit) else: self.revisions = iter(revisions) def __iter__(self) -> Generator[RevisionEntry, None, None]: for id, date, root in self.revisions: if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) yield RevisionEntry(id, date=date, root=root) -@statsd.timed( - metric="swh_provenance_revision_content_layer_accesstime_seconds", - tags={"method": "main"}, -) +@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"}) def revision_add( provenance: ProvenanceInterface, archive: ArchiveInterface, revisions: List[RevisionEntry], trackall: bool = True, lower: bool = True, mindepth: int = 1, commit: bool = True, ) -> None: for revision in revisions: assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: graph = build_isochrone_graph( archive, provenance, revision, DirectoryEntry(revision.root), ) # TODO: add file size filtering revision_process_content( archive, provenance, revision, graph, trackall=trackall, lower=lower, mindepth=mindepth, ) if commit: provenance.flush() -@statsd.timed( - metric="swh_provenance_revision_content_layer_accesstime_seconds", - tags={"method": "process_content"}, -) +@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"}) def revision_process_content( archive: ArchiveInterface, provenance: ProvenanceInterface, revision: RevisionEntry, graph: IsochroneNode, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> None: assert revision.date is not None provenance.revision_add(revision) stack = [graph] while stack: current = stack.pop() if current.dbdate is not None: assert current.dbdate <= revision.date if trackall: # Current directory is an outer isochrone frontier for a previously # processed revision. It should be reused as is. provenance.directory_add_to_revision( revision, current.entry, current.path ) else: assert current.maxdate is not None # Current directory is not an outer isochrone frontier for any previous # revision. It might be eligible for this one. if is_new_frontier( current, revision=revision, trackall=trackall, lower=lower, mindepth=mindepth, ): # Outer frontier should be moved to current position in the isochrone # graph. This is the first time this directory is found in the isochrone # frontier. provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) if trackall: provenance.directory_add_to_revision( revision, current.entry, current.path ) flatten_directory(archive, provenance, current.entry) else: # If current node is an invalidated frontier, update its date for future # revisions to get the proper value. if current.invalid: provenance.directory_set_date_in_isochrone_frontier( current.entry, current.maxdate ) # No point moving the frontier here. Either there are no files or they # are being seen for the first time here. Add all blobs to current # revision updating date if necessary, and recursively analyse # subdirectories as candidates to the outer frontier. for blob in current.entry.files: date = provenance.content_get_early_date(blob) if date is None or revision.date < date: provenance.content_set_early_date(blob, revision.date) provenance.content_add_to_revision(revision, blob, current.path) for child in current.children: stack.append(child) -@statsd.timed( - metric="swh_provenance_revision_content_layer_accesstime_seconds", - tags={"method": "flatten_directory"}, -) +@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten_directory"}) def flatten_directory( archive: ArchiveInterface, provenance: ProvenanceInterface, directory: DirectoryEntry, ) -> None: """Recursively retrieve all the files of 'directory' and insert them in the 'provenance' database in the 'content_to_directory' table. """ stack = [(directory, b"")] while stack: current, prefix = stack.pop() current.retrieve_children(archive) for f_child in current.files: # Add content to the directory with the computed prefix. provenance.content_add_to_directory(directory, f_child, prefix) for d_child in current.dirs: # Recursively walk the child directory. stack.append((d_child, os.path.join(prefix, d_child.name))) def is_new_frontier( node: IsochroneNode, revision: RevisionEntry, trackall: bool = True, lower: bool = True, mindepth: int = 1, ) -> bool: assert node.maxdate is not None # for mypy assert revision.date is not None # idem if trackall: - # The only real condition for a directory to be a frontier is that its - # content is already known and its maxdate is less (or equal) than - # current revision's date. Checking mindepth is meant to skip root - # directories (or any arbitrary depth) to improve the result. The - # option lower tries to maximize the reusage rate of previously defined - # frontiers by keeping them low in the directory tree. + # The only real condition for a directory to be a frontier is that its content + # is already known and its maxdate is less (or equal) than current revision's + # date. Checking mindepth is meant to skip root directories (or any arbitrary + # depth) to improve the result. The option lower tries to maximize the reuse + # rate of previously defined frontiers by keeping them low in the directory + # tree. return ( node.known and node.maxdate <= revision.date # all content is earlier than revision and node.depth >= mindepth # current node is deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob in it ) else: # If we are only tracking first occurrences, we want to ensure that all first # occurrences end up in the content_early_in_rev relation. Thus, we force for - # every blob outside a frontier to have an extrictly earlier date. + # every blob outside a frontier to have an strictly earlier date. return ( node.maxdate < revision.date # all content is earlier than revision and node.depth >= mindepth # deeper than the min allowed depth and (has_blobs(node) if lower else True) # there is at least one blob ) def has_blobs(node: IsochroneNode) -> bool: # We may want to look for files in different ways to decide whether to define a # frontier or not: # 1. Only files in current node: return any(node.entry.files) # 2. Files anywhere in the isochrone graph # stack = [node] # while stack: # current = stack.pop() # if any( # map(lambda child: isinstance(child.entry, FileEntry), current.children)): # return True # else: # # All children are directory entries. # stack.extend(current.children) # return False # 3. Files in the intermediate directories between current node and any previously # defined frontier: # TODO: complete this case! # return any( # map(lambda child: isinstance(child.entry, FileEntry), node.children) # ) or all( # map( # lambda child: ( # not (isinstance(child.entry, DirectoryEntry) and child.date is None) # ) # or has_blobs(child), # node.children, # ) # ) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index ab82904..417f614 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,76 +1,71 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Any, Dict, Iterable, Set, Tuple from swh.core.statsd import statsd from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface +ARCHIVE_DURATION_METRIC = "swh_provenance_archive_api_duration_seconds" + class ArchiveStorage: def __init__(self, storage: StorageInterface) -> None: self.storage = storage - @statsd.timed( - metric="swh_provenance_archive_api_accesstime_seconds", - tags={"method": "directory_ls"}, - ) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: add file size filtering for entry in self.storage.directory_ls(id): yield { "name": entry["name"], "target": entry["target"], "type": entry["type"], } @statsd.timed( - metric="swh_provenance_archive_api_accesstime_seconds", - tags={"method": "revision_get_parents"}, + metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: rev = self.storage.revision_get([id])[0] if rev is not None: yield from rev.parents - @statsd.timed( - metric="swh_provenance_archive_api_accesstime_seconds", - tags={"method": "snapshot_get_heads"}, - ) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Tuple[datetime, Sha1Git]] = set() for targets in grouper(targets_set, batchsize): revisions.update( (revision.date.to_datetime(), revision.id) for revision in self.storage.revision_get(list(targets)) if revision is not None and revision.date is not None ) yield from (head for _, head in sorted(revisions))