diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 2196be6..ad7e8d3 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,54 +1,53 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from swh.storage.interface import StorageInterface @runtime_checkable class ArchiveInterface(Protocol): storage: StorageInterface def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: dictionary of entries in such directory containing only the keys "name", "target" and "type". """ ... def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: """List parents of one revision. Args: revisions: sha1 id of the revision to list parents from. Yields: sha1 ids for the parents of such revision. """ ... def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions targeted by one snapshot. Args: id: sha1 id of the snapshot. Yields: - sha1 ids of revisions that are a target of such snapshot. Revisions are - guaranteed to be retrieved in chronological order + sha1 ids of revisions that are a target of such snapshot. """ ... diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 1fd3eaa..8ba2203 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,143 +1,143 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2.extensions from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.storage import get_storage ARCHIVE_DURATION_METRIC = "swh_provenance_archive_direct_duration_seconds" class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection) -> None: self.storage = get_storage( "postgresql", db=conn.dsn, objstorage={"cls": "memory"} ) self.conn = conn def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: yield from self._directory_ls(id, minsize=minsize) @lru_cache(maxsize=100000) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def _directory_ls(self, id: Sha1Git, minsize: int = 0) -> List[Dict[str, Any]]: with self.conn.cursor() as cursor: if minsize > 0: cursor.execute( """ WITH dir AS (SELECT dir_entries, file_entries FROM directory WHERE id=%s), ls_d AS (SELECT DISTINCT UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT DISTINCT UNNEST(file_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION ALL (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git WHERE c.length >= %s ) SELECT * FROM known_contents UNION ALL (SELECT 'file'::directory_entry_type AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.target=e.target ) AND c.length >= %s ) ) """, (id, minsize, minsize), ) else: cursor.execute( """ WITH dir AS (SELECT dir_entries, file_entries FROM directory WHERE id=%s), ls_d AS (SELECT DISTINCT UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT DISTINCT UNNEST(file_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION ALL (SELECT 'file'::directory_entry_type AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id) """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor ] def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: yield from self._revision_get_parents(id) @lru_cache(maxsize=100000) @statsd.timed( metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} ) def _revision_get_parents(self, id: Sha1Git) -> List[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) return [row[0] for row in cursor] @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) - ORDER BY date, id) + ) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 3093c29..6ee1339 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,73 +1,73 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Any, Dict, Iterable, Set, Tuple from swh.core.statsd import statsd from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface ARCHIVE_DURATION_METRIC = "swh_provenance_archive_api_duration_seconds" class ArchiveStorage: def __init__(self, storage: StorageInterface) -> None: self.storage = storage @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: for entry in self.storage.directory_ls(id): if entry["type"] == "dir" or ( entry["type"] == "file" and entry["length"] >= minsize ): yield { "name": entry["name"], "target": entry["target"], "type": entry["type"], } @statsd.timed( metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: rev = self.storage.revision_get([id])[0] if rev is not None: yield from rev.parents @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Tuple[datetime, Sha1Git]] = set() for targets in grouper(targets_set, batchsize): revisions.update( (revision.date.to_datetime(), revision.id) for revision in self.storage.revision_get(list(targets)) if revision is not None and revision.date is not None ) - yield from (head for _, head in sorted(revisions)) + yield from (head for _, head in revisions)