diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -57,6 +57,7 @@ PartialBranches, Sha1, ) +from swh.storage.metrics import process_metrics, send_metric, timed from swh.storage.objstorage import ObjStorage from swh.storage.utils import map_optional, now from swh.storage.writer import JournalWriter @@ -136,6 +137,7 @@ self._hosts, self._keyspace, self._port, self._consistency_level ) + @timed def check_config(self, *, check_write: bool) -> bool: self._cql_runner.check_read() @@ -238,6 +240,8 @@ return summary + @timed + @process_metrics def content_add(self, content: List[Content]) -> Dict[str, int]: to_add = { (c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content @@ -245,6 +249,7 @@ contents = [attr.evolve(c, ctime=now()) for c in to_add] return self._content_add(list(contents), with_data=True) + @timed def content_update( self, contents: List[Dict[str, Any]], keys: List[str] = [] ) -> None: @@ -252,13 +257,17 @@ "content_update is not supported by the Cassandra backend" ) + @timed + @process_metrics def content_add_metadata(self, content: List[Content]) -> Dict[str, int]: return self._content_add(content, with_data=False) + @timed def content_get_data(self, content: Sha1) -> Optional[bytes]: # FIXME: Make this method support slicing the `data` return self.objstorage.content_get(content) + @timed def content_get_partition( self, partition_id: int, @@ -300,6 +309,7 @@ assert len(contents) <= limit return PagedResult(results=contents, next_page_token=next_page_token) + @timed def content_get( self, contents: List[bytes], algo: str = "sha1" ) -> List[Optional[Content]]: @@ -320,6 +330,7 @@ contents_by_hash[key(content)] = content return [contents_by_hash.get(hash_) for hash_ in contents] + @timed def content_find(self, content: Dict[str, Any]) -> List[Content]: # Find an algorithm that is common to all the requested contents. # It will be used to do an initial filtering efficiently. @@ -347,6 +358,7 @@ results.append(Content(**row_d)) return results + @timed def content_missing( self, contents: List[Dict[str, Any]], key_hash: str = "sha1" ) -> Iterable[bytes]: @@ -375,9 +387,11 @@ if not res: yield content[key_hash] + @timed def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]: return self.content_missing([{"sha1": c} for c in contents]) + @timed def content_missing_per_sha1_git( self, contents: List[Sha1Git] ) -> Iterable[Sha1Git]: @@ -385,6 +399,7 @@ [{"sha1_git": c} for c in contents], key_hash="sha1_git" ) + @timed def content_get_random(self) -> Sha1Git: content = self._cql_runner.content_get_random() assert content, "Could not find any content" @@ -416,10 +431,13 @@ return {"skipped_content:add": len(contents)} + @timed + @process_metrics def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: contents = [attr.evolve(c, ctime=now()) for c in content] return self._skipped_content_add(contents) + @timed def skipped_content_missing( self, contents: List[Dict[str, Any]] ) -> Iterable[Dict[str, Any]]: @@ -427,6 +445,8 @@ if not self._cql_runner.skipped_content_get_from_pk(content): yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} + @timed + @process_metrics def directory_add(self, directories: List[Directory]) -> Dict[str, int]: to_add = {d.id: d for d in directories}.values() if not self._allow_overwrite: @@ -450,6 +470,7 @@ return {"directory:add": len(directories)} + @timed def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.directory_missing(directories) @@ -504,6 +525,7 @@ ret["target"], True, prefix + ret["name"] + b"/" ) + @timed def directory_entry_get_by_path( self, directory: Sha1Git, paths: List[bytes] ) -> Optional[Dict[str, Any]]: @@ -543,11 +565,13 @@ first_item["target"], paths[1:], prefix + paths[0] + b"/" ) + @timed def directory_ls( self, directory: Sha1Git, recursive: bool = False ) -> Iterable[Dict[str, Any]]: yield from self._directory_ls(directory, recursive) + @timed def directory_get_entries( self, directory_id: Sha1Git, @@ -572,11 +596,14 @@ next_page_token = None return PagedResult(results=entries, next_page_token=next_page_token) + @timed def directory_get_random(self) -> Sha1Git: directory = self._cql_runner.directory_get_random() assert directory, "Could not find any directory" return directory.id + @timed + @process_metrics def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: # Filter-out revisions already in the database if not self._allow_overwrite: @@ -604,9 +631,11 @@ return {"revision:add": len(revisions)} + @timed def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.revision_missing(revisions) + @timed def revision_get(self, revision_ids: List[Sha1Git]) -> List[Optional[Revision]]: rows = self._cql_runner.revision_get(revision_ids) revisions: Dict[Sha1Git, Revision] = {} @@ -673,23 +702,28 @@ yield rev.to_dict() yield from self._get_parent_revs(parents, seen, limit, short) + @timed def revision_log( self, revisions: List[Sha1Git], limit: Optional[int] = None ) -> Iterable[Optional[Dict[str, Any]]]: seen: Set[Sha1Git] = set() yield from self._get_parent_revs(revisions, seen, limit, False) + @timed def revision_shortlog( self, revisions: List[Sha1Git], limit: Optional[int] = None ) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: seen: Set[Sha1Git] = set() yield from self._get_parent_revs(revisions, seen, limit, True) + @timed def revision_get_random(self) -> Sha1Git: revision = self._cql_runner.revision_get_random() assert revision, "Could not find any revision" return revision.id + @timed + @process_metrics def release_add(self, releases: List[Release]) -> Dict[str, int]: if not self._allow_overwrite: to_add = {r.id: r for r in releases}.values() @@ -703,9 +737,11 @@ return {"release:add": len(releases)} + @timed def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.release_missing(releases) + @timed def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]: rows = self._cql_runner.release_get(releases) rels: Dict[Sha1Git, Release] = {} @@ -715,11 +751,14 @@ return [rels.get(rel_id) for rel_id in releases] + @timed def release_get_random(self) -> Sha1Git: release = self._cql_runner.release_get_random() assert release, "Could not find any release" return release.id + @timed + @process_metrics def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: if not self._allow_overwrite: to_add = {s.id: s for s in snapshots}.values() @@ -753,9 +792,11 @@ return {"snapshot:add": len(snapshots)} + @timed def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.snapshot_missing(snapshots) + @timed def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]: d = self.snapshot_get_branches(snapshot_id) if d is None: @@ -769,6 +810,7 @@ "next_branch": d["next_branch"], } + @timed def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Optional[Dict[Optional[str], int]]: @@ -781,6 +823,7 @@ snapshot_id, branch_name_exclude_prefix ) + @timed def snapshot_get_branches( self, snapshot_id: Sha1Git, @@ -856,11 +899,13 @@ next_branch=last_branch, ) + @timed def snapshot_get_random(self) -> Sha1Git: snapshot = self._cql_runner.snapshot_get_random() assert snapshot, "Could not find any snapshot" return snapshot.id + @timed def object_find_by_sha1_git(self, ids: List[Sha1Git]) -> Dict[Sha1Git, List[Dict]]: results: Dict[Sha1Git, List[Dict]] = {id_: [] for id_ in ids} missing_ids = set(ids) @@ -888,9 +933,11 @@ return results + @timed def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]: return [self.origin_get_one(origin) for origin in origins] + @timed def origin_get_one(self, origin_url: str) -> Optional[Origin]: """Given an origin url, return the origin if it exists, None otherwise @@ -902,6 +949,7 @@ else: return None + @timed def origin_get_by_sha1(self, sha1s: List[bytes]) -> List[Optional[Dict[str, Any]]]: results = [] for sha1 in sha1s: @@ -910,6 +958,7 @@ results.append(origin) return results + @timed def origin_list( self, page_token: Optional[str] = None, limit: int = 100 ) -> PagedResult[Origin]: @@ -938,6 +987,7 @@ return PagedResult(results=origins, next_page_token=next_page_token) + @timed def origin_search( self, url_pattern: str, @@ -985,6 +1035,7 @@ assert len(origins) <= limit return PagedResult(results=origins, next_page_token=next_page_token) + @timed def origin_count( self, url_pattern: str, regexp: bool = False, with_visit: bool = False ) -> int: @@ -992,6 +1043,8 @@ "The Cassandra backend does not implement origin_count" ) + @timed + @process_metrics def origin_add(self, origins: List[Origin]) -> Dict[str, int]: if not self._allow_overwrite: to_add = {o.url: o for o in origins}.values() @@ -1004,6 +1057,7 @@ ) return {"origin:add": len(origins)} + @timed def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: for visit in visits: origin = self.origin_get_one(visit.origin) @@ -1037,7 +1091,7 @@ snapshot=None, ) ) - + send_metric("origin_visit:add", count=nb_visits, method_name="origin_visit") return all_visits def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: @@ -1058,6 +1112,8 @@ converters.visit_status_to_row(visit_status) ) + @timed + @process_metrics def origin_visit_status_add( self, visit_statuses: List[OriginVisitStatus] ) -> Dict[str, int]: @@ -1111,6 +1167,7 @@ "date": visit.date.replace(tzinfo=datetime.timezone.utc), } + @timed def origin_visit_get( self, origin: str, @@ -1139,6 +1196,7 @@ return PagedResult(results=visits, next_page_token=next_page_token) + @timed def origin_visit_status_get( self, origin: str, @@ -1165,6 +1223,7 @@ return PagedResult(results=visit_statuses, next_page_token=next_page_token) + @timed def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime ) -> Optional[OriginVisit]: @@ -1181,12 +1240,14 @@ return converters.row_to_visit(min(rows, key=key)) return None + @timed def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]: row = self._cql_runner.origin_visit_get_one(origin, visit) if row: return converters.row_to_visit(row) return None + @timed def origin_visit_get_latest( self, origin: str, @@ -1234,6 +1295,7 @@ type=latest_visit["type"], ) + @timed def origin_visit_status_get_latest( self, origin_url: str, @@ -1256,6 +1318,7 @@ return None return converters.row_to_visit_status(rows[0]) + @timed def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]: back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back @@ -1271,6 +1334,7 @@ return visit_status return None + @timed def stat_counters(self): rows = self._cql_runner.stat_counters() keys = ( @@ -1287,9 +1351,12 @@ stats.update({row.object_type: row.count for row in rows}) return stats + @timed def refresh_stat_counters(self): pass + @timed + @process_metrics def raw_extrinsic_metadata_add( self, metadata: List[RawExtrinsicMetadata] ) -> Dict[str, int]: @@ -1349,6 +1416,7 @@ f"{type.value}_metadata:add": count for (type, count) in counter.items() } + @timed def raw_extrinsic_metadata_get( self, target: ExtendedSWHID, @@ -1396,6 +1464,7 @@ return PagedResult(next_page_token=next_page_token, results=results,) + @timed def raw_extrinsic_metadata_get_by_ids( self, ids: List[Sha1Git] ) -> List[RawExtrinsicMetadata]: @@ -1418,6 +1487,7 @@ return list(results) + @timed def raw_extrinsic_metadata_get_authorities( self, target: ExtendedSWHID ) -> List[MetadataAuthority]: @@ -1430,6 +1500,8 @@ ) ] + @timed + @process_metrics def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]: self.journal_writer.metadata_fetcher_add(fetchers) for fetcher in fetchers: @@ -1438,6 +1510,7 @@ ) return {"metadata_fetcher:add": len(fetchers)} + @timed def metadata_fetcher_get( self, name: str, version: str ) -> Optional[MetadataFetcher]: @@ -1447,6 +1520,8 @@ else: return None + @timed + @process_metrics def metadata_authority_add( self, authorities: List[MetadataAuthority] ) -> Dict[str, int]: @@ -1457,6 +1532,7 @@ ) return {"metadata_authority:add": len(authorities)} + @timed def metadata_authority_get( self, type: MetadataAuthorityType, url: str ) -> Optional[MetadataAuthority]: @@ -1469,6 +1545,8 @@ return None # ExtID tables + @timed + @process_metrics def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: if not self._allow_overwrite: extids = [ @@ -1506,6 +1584,7 @@ inserted += 1 return {"extid:add": inserted} + @timed def extid_get_from_extid(self, id_type: str, ids: List[bytes]) -> List[ExtID]: result: List[ExtID] = [] for extid in ids: @@ -1523,6 +1602,7 @@ ) return result + @timed def extid_get_from_target( self, target_type: SwhidObjectType, ids: List[Sha1Git] ) -> List[ExtID]: