Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123950
D6162.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D6162.diff
View Options
diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2020 The Software Heritage developers
+# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -57,6 +57,7 @@
PartialBranches,
Sha1,
)
+from swh.storage.metrics import process_metrics, send_metric, timed
from swh.storage.objstorage import ObjStorage
from swh.storage.utils import map_optional, now
from swh.storage.writer import JournalWriter
@@ -136,6 +137,7 @@
self._hosts, self._keyspace, self._port, self._consistency_level
)
+ @timed
def check_config(self, *, check_write: bool) -> bool:
self._cql_runner.check_read()
@@ -238,6 +240,8 @@
return summary
+ @timed
+ @process_metrics
def content_add(self, content: List[Content]) -> Dict[str, int]:
to_add = {
(c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content
@@ -245,6 +249,7 @@
contents = [attr.evolve(c, ctime=now()) for c in to_add]
return self._content_add(list(contents), with_data=True)
+ @timed
def content_update(
self, contents: List[Dict[str, Any]], keys: List[str] = []
) -> None:
@@ -252,13 +257,17 @@
"content_update is not supported by the Cassandra backend"
)
+ @timed
+ @process_metrics
def content_add_metadata(self, content: List[Content]) -> Dict[str, int]:
return self._content_add(content, with_data=False)
+ @timed
def content_get_data(self, content: Sha1) -> Optional[bytes]:
# FIXME: Make this method support slicing the `data`
return self.objstorage.content_get(content)
+ @timed
def content_get_partition(
self,
partition_id: int,
@@ -300,6 +309,7 @@
assert len(contents) <= limit
return PagedResult(results=contents, next_page_token=next_page_token)
+ @timed
def content_get(
self, contents: List[bytes], algo: str = "sha1"
) -> List[Optional[Content]]:
@@ -320,6 +330,7 @@
contents_by_hash[key(content)] = content
return [contents_by_hash.get(hash_) for hash_ in contents]
+ @timed
def content_find(self, content: Dict[str, Any]) -> List[Content]:
# Find an algorithm that is common to all the requested contents.
# It will be used to do an initial filtering efficiently.
@@ -347,6 +358,7 @@
results.append(Content(**row_d))
return results
+ @timed
def content_missing(
self, contents: List[Dict[str, Any]], key_hash: str = "sha1"
) -> Iterable[bytes]:
@@ -375,9 +387,11 @@
if not res:
yield content[key_hash]
+ @timed
def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]:
return self.content_missing([{"sha1": c} for c in contents])
+ @timed
def content_missing_per_sha1_git(
self, contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
@@ -385,6 +399,7 @@
[{"sha1_git": c} for c in contents], key_hash="sha1_git"
)
+ @timed
def content_get_random(self) -> Sha1Git:
content = self._cql_runner.content_get_random()
assert content, "Could not find any content"
@@ -416,10 +431,13 @@
return {"skipped_content:add": len(contents)}
+ @timed
+ @process_metrics
def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]:
contents = [attr.evolve(c, ctime=now()) for c in content]
return self._skipped_content_add(contents)
+ @timed
def skipped_content_missing(
self, contents: List[Dict[str, Any]]
) -> Iterable[Dict[str, Any]]:
@@ -427,6 +445,8 @@
if not self._cql_runner.skipped_content_get_from_pk(content):
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS}
+ @timed
+ @process_metrics
def directory_add(self, directories: List[Directory]) -> Dict[str, int]:
to_add = {d.id: d for d in directories}.values()
if not self._allow_overwrite:
@@ -450,6 +470,7 @@
return {"directory:add": len(directories)}
+ @timed
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.directory_missing(directories)
@@ -504,6 +525,7 @@
ret["target"], True, prefix + ret["name"] + b"/"
)
+ @timed
def directory_entry_get_by_path(
self, directory: Sha1Git, paths: List[bytes]
) -> Optional[Dict[str, Any]]:
@@ -543,11 +565,13 @@
first_item["target"], paths[1:], prefix + paths[0] + b"/"
)
+ @timed
def directory_ls(
self, directory: Sha1Git, recursive: bool = False
) -> Iterable[Dict[str, Any]]:
yield from self._directory_ls(directory, recursive)
+ @timed
def directory_get_entries(
self,
directory_id: Sha1Git,
@@ -572,11 +596,14 @@
next_page_token = None
return PagedResult(results=entries, next_page_token=next_page_token)
+ @timed
def directory_get_random(self) -> Sha1Git:
directory = self._cql_runner.directory_get_random()
assert directory, "Could not find any directory"
return directory.id
+ @timed
+ @process_metrics
def revision_add(self, revisions: List[Revision]) -> Dict[str, int]:
# Filter-out revisions already in the database
if not self._allow_overwrite:
@@ -604,9 +631,11 @@
return {"revision:add": len(revisions)}
+ @timed
def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.revision_missing(revisions)
+ @timed
def revision_get(self, revision_ids: List[Sha1Git]) -> List[Optional[Revision]]:
rows = self._cql_runner.revision_get(revision_ids)
revisions: Dict[Sha1Git, Revision] = {}
@@ -673,23 +702,28 @@
yield rev.to_dict()
yield from self._get_parent_revs(parents, seen, limit, short)
+ @timed
def revision_log(
self, revisions: List[Sha1Git], limit: Optional[int] = None
) -> Iterable[Optional[Dict[str, Any]]]:
seen: Set[Sha1Git] = set()
yield from self._get_parent_revs(revisions, seen, limit, False)
+ @timed
def revision_shortlog(
self, revisions: List[Sha1Git], limit: Optional[int] = None
) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]:
seen: Set[Sha1Git] = set()
yield from self._get_parent_revs(revisions, seen, limit, True)
+ @timed
def revision_get_random(self) -> Sha1Git:
revision = self._cql_runner.revision_get_random()
assert revision, "Could not find any revision"
return revision.id
+ @timed
+ @process_metrics
def release_add(self, releases: List[Release]) -> Dict[str, int]:
if not self._allow_overwrite:
to_add = {r.id: r for r in releases}.values()
@@ -703,9 +737,11 @@
return {"release:add": len(releases)}
+ @timed
def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.release_missing(releases)
+ @timed
def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]:
rows = self._cql_runner.release_get(releases)
rels: Dict[Sha1Git, Release] = {}
@@ -715,11 +751,14 @@
return [rels.get(rel_id) for rel_id in releases]
+ @timed
def release_get_random(self) -> Sha1Git:
release = self._cql_runner.release_get_random()
assert release, "Could not find any release"
return release.id
+ @timed
+ @process_metrics
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]:
if not self._allow_overwrite:
to_add = {s.id: s for s in snapshots}.values()
@@ -753,9 +792,11 @@
return {"snapshot:add": len(snapshots)}
+ @timed
def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.snapshot_missing(snapshots)
+ @timed
def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]:
d = self.snapshot_get_branches(snapshot_id)
if d is None:
@@ -769,6 +810,7 @@
"next_branch": d["next_branch"],
}
+ @timed
def snapshot_count_branches(
self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None,
) -> Optional[Dict[Optional[str], int]]:
@@ -781,6 +823,7 @@
snapshot_id, branch_name_exclude_prefix
)
+ @timed
def snapshot_get_branches(
self,
snapshot_id: Sha1Git,
@@ -856,11 +899,13 @@
next_branch=last_branch,
)
+ @timed
def snapshot_get_random(self) -> Sha1Git:
snapshot = self._cql_runner.snapshot_get_random()
assert snapshot, "Could not find any snapshot"
return snapshot.id
+ @timed
def object_find_by_sha1_git(self, ids: List[Sha1Git]) -> Dict[Sha1Git, List[Dict]]:
results: Dict[Sha1Git, List[Dict]] = {id_: [] for id_ in ids}
missing_ids = set(ids)
@@ -888,9 +933,11 @@
return results
+ @timed
def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]:
return [self.origin_get_one(origin) for origin in origins]
+ @timed
def origin_get_one(self, origin_url: str) -> Optional[Origin]:
"""Given an origin url, return the origin if it exists, None otherwise
@@ -902,6 +949,7 @@
else:
return None
+ @timed
def origin_get_by_sha1(self, sha1s: List[bytes]) -> List[Optional[Dict[str, Any]]]:
results = []
for sha1 in sha1s:
@@ -910,6 +958,7 @@
results.append(origin)
return results
+ @timed
def origin_list(
self, page_token: Optional[str] = None, limit: int = 100
) -> PagedResult[Origin]:
@@ -938,6 +987,7 @@
return PagedResult(results=origins, next_page_token=next_page_token)
+ @timed
def origin_search(
self,
url_pattern: str,
@@ -985,6 +1035,7 @@
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
+ @timed
def origin_count(
self, url_pattern: str, regexp: bool = False, with_visit: bool = False
) -> int:
@@ -992,6 +1043,8 @@
"The Cassandra backend does not implement origin_count"
)
+ @timed
+ @process_metrics
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
if not self._allow_overwrite:
to_add = {o.url: o for o in origins}.values()
@@ -1004,6 +1057,7 @@
)
return {"origin:add": len(origins)}
+ @timed
def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
for visit in visits:
origin = self.origin_get_one(visit.origin)
@@ -1037,7 +1091,7 @@
snapshot=None,
)
)
-
+ send_metric("origin_visit:add", count=nb_visits, method_name="origin_visit")
return all_visits
def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None:
@@ -1058,6 +1112,8 @@
converters.visit_status_to_row(visit_status)
)
+ @timed
+ @process_metrics
def origin_visit_status_add(
self, visit_statuses: List[OriginVisitStatus]
) -> Dict[str, int]:
@@ -1111,6 +1167,7 @@
"date": visit.date.replace(tzinfo=datetime.timezone.utc),
}
+ @timed
def origin_visit_get(
self,
origin: str,
@@ -1139,6 +1196,7 @@
return PagedResult(results=visits, next_page_token=next_page_token)
+ @timed
def origin_visit_status_get(
self,
origin: str,
@@ -1165,6 +1223,7 @@
return PagedResult(results=visit_statuses, next_page_token=next_page_token)
+ @timed
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
) -> Optional[OriginVisit]:
@@ -1181,12 +1240,14 @@
return converters.row_to_visit(min(rows, key=key))
return None
+ @timed
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]:
row = self._cql_runner.origin_visit_get_one(origin, visit)
if row:
return converters.row_to_visit(row)
return None
+ @timed
def origin_visit_get_latest(
self,
origin: str,
@@ -1234,6 +1295,7 @@
type=latest_visit["type"],
)
+ @timed
def origin_visit_status_get_latest(
self,
origin_url: str,
@@ -1256,6 +1318,7 @@
return None
return converters.row_to_visit_status(rows[0])
+ @timed
def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]:
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back
@@ -1271,6 +1334,7 @@
return visit_status
return None
+ @timed
def stat_counters(self):
rows = self._cql_runner.stat_counters()
keys = (
@@ -1287,9 +1351,12 @@
stats.update({row.object_type: row.count for row in rows})
return stats
+ @timed
def refresh_stat_counters(self):
pass
+ @timed
+ @process_metrics
def raw_extrinsic_metadata_add(
self, metadata: List[RawExtrinsicMetadata]
) -> Dict[str, int]:
@@ -1349,6 +1416,7 @@
f"{type.value}_metadata:add": count for (type, count) in counter.items()
}
+ @timed
def raw_extrinsic_metadata_get(
self,
target: ExtendedSWHID,
@@ -1396,6 +1464,7 @@
return PagedResult(next_page_token=next_page_token, results=results,)
+ @timed
def raw_extrinsic_metadata_get_by_ids(
self, ids: List[Sha1Git]
) -> List[RawExtrinsicMetadata]:
@@ -1418,6 +1487,7 @@
return list(results)
+ @timed
def raw_extrinsic_metadata_get_authorities(
self, target: ExtendedSWHID
) -> List[MetadataAuthority]:
@@ -1430,6 +1500,8 @@
)
]
+ @timed
+ @process_metrics
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]:
self.journal_writer.metadata_fetcher_add(fetchers)
for fetcher in fetchers:
@@ -1438,6 +1510,7 @@
)
return {"metadata_fetcher:add": len(fetchers)}
+ @timed
def metadata_fetcher_get(
self, name: str, version: str
) -> Optional[MetadataFetcher]:
@@ -1447,6 +1520,8 @@
else:
return None
+ @timed
+ @process_metrics
def metadata_authority_add(
self, authorities: List[MetadataAuthority]
) -> Dict[str, int]:
@@ -1457,6 +1532,7 @@
)
return {"metadata_authority:add": len(authorities)}
+ @timed
def metadata_authority_get(
self, type: MetadataAuthorityType, url: str
) -> Optional[MetadataAuthority]:
@@ -1469,6 +1545,8 @@
return None
# ExtID tables
+ @timed
+ @process_metrics
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]:
if not self._allow_overwrite:
extids = [
@@ -1506,6 +1584,7 @@
inserted += 1
return {"extid:add": inserted}
+ @timed
def extid_get_from_extid(self, id_type: str, ids: List[bytes]) -> List[ExtID]:
result: List[ExtID] = []
for extid in ids:
@@ -1523,6 +1602,7 @@
)
return result
+ @timed
def extid_get_from_target(
self, target_type: SwhidObjectType, ids: List[Sha1Git]
) -> List[ExtID]:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 7:03 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216364
Attached To
D6162: cassandra: generate statsd metrics on method calls
Event Timeline
Log In to Comment