diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index db7e971..c95cf69 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,116 +1,129 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2.extensions +from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.storage import get_storage class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection) -> None: self.storage = get_storage( "postgresql", db=conn.dsn, objstorage={"cls": "memory"} ) self.conn = conn def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: entries = self._directory_ls(id) yield from entries @lru_cache(maxsize=100000) + @statsd.timed( + metric="swh_provenance_archive_direct_accesstime_seconds", + tags={"method": "directory_ls"}, + ) def _directory_ls(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """ WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor ] + @statsd.timed( + metric="swh_provenance_archive_direct_accesstime_seconds", + tags={"method": "revision_get_parents"}, + ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) # There should be at most one row anyway yield from (row[0] for row in cursor) + @statsd.timed( + metric="swh_provenance_archive_direct_accesstime_seconds", + tags={"method": "snapshot_get_heads"}, + ) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) ORDER BY date, id) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor) diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py index 305cedf..ab82904 100644 --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -1,63 +1,76 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Any, Dict, Iterable, Set, Tuple +from swh.core.statsd import statsd from swh.model.model import ObjectType, Sha1Git, TargetType from swh.storage.interface import StorageInterface class ArchiveStorage: def __init__(self, storage: StorageInterface) -> None: self.storage = storage + @statsd.timed( + metric="swh_provenance_archive_api_accesstime_seconds", + tags={"method": "directory_ls"}, + ) def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: # TODO: add file size filtering for entry in self.storage.directory_ls(id): yield { "name": entry["name"], "target": entry["target"], "type": entry["type"], } + @statsd.timed( + metric="swh_provenance_archive_api_accesstime_seconds", + tags={"method": "revision_get_parents"}, + ) def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: rev = self.storage.revision_get([id])[0] if rev is not None: yield from rev.parents + @statsd.timed( + metric="swh_provenance_archive_api_accesstime_seconds", + tags={"method": "snapshot_get_heads"}, + ) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper from swh.storage.algos.snapshot import snapshot_get_all_branches snapshot = snapshot_get_all_branches(self.storage, id) assert snapshot is not None targets_set = set() releases_set = set() if snapshot is not None: for branch in snapshot.branches: if snapshot.branches[branch].target_type == TargetType.REVISION: targets_set.add(snapshot.branches[branch].target) elif snapshot.branches[branch].target_type == TargetType.RELEASE: releases_set.add(snapshot.branches[branch].target) batchsize = 100 for releases in grouper(releases_set, batchsize): targets_set.update( release.target for release in self.storage.release_get(list(releases)) if release is not None and release.target_type == ObjectType.REVISION ) revisions: Set[Tuple[datetime, Sha1Git]] = set() for targets in grouper(targets_set, batchsize): revisions.update( (revision.date.to_datetime(), revision.id) for revision in self.storage.revision_get(list(targets)) if revision is not None and revision.date is not None ) yield from (head for _, head in sorted(revisions)) diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index 04727d0..9c4b21f 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,53 +1,61 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter +from typing import Counter as TCounter import pytest from swh.core.db import BaseDb +from swh.model.model import Sha1Git from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) def test_archive_interface(repo: str, swh_storage: StorageInterface) -> None: archive_api = ArchiveStorage(swh_storage) assert isinstance(swh_storage, Storage) dsn = swh_storage.get_db().conn.dsn with BaseDb.connect(dsn).conn as conn: BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) for directory in data["directory"]: entries_api = sorted( archive_api.directory_ls(directory["id"]), key=itemgetter("name") ) entries_direct = sorted( archive_direct.directory_ls(directory["id"]), key=itemgetter("name") ) assert entries_api == entries_direct for revision in data["revision"]: - parents_api = Counter(archive_api.revision_get_parents(revision["id"])) - parents_direct = Counter( + parents_api: TCounter[Sha1Git] = Counter( + archive_api.revision_get_parents(revision["id"]) + ) + parents_direct: TCounter[Sha1Git] = Counter( archive_direct.revision_get_parents(revision["id"]) ) assert parents_api == parents_direct for snapshot in data["snapshot"]: - heads_api = Counter(archive_api.snapshot_get_heads(snapshot["id"])) - heads_direct = Counter(archive_direct.snapshot_get_heads(snapshot["id"])) + heads_api: TCounter[Sha1Git] = Counter( + archive_api.snapshot_get_heads(snapshot["id"]) + ) + heads_direct: TCounter[Sha1Git] = Counter( + archive_direct.snapshot_get_heads(snapshot["id"]) + ) assert heads_api == heads_direct