Page MenuHomeSoftware Heritage

D6162.diff
No OneTemporary

D6162.diff

diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019-2020 The Software Heritage developers
+# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -57,6 +57,7 @@
PartialBranches,
Sha1,
)
+from swh.storage.metrics import process_metrics, send_metric, timed
from swh.storage.objstorage import ObjStorage
from swh.storage.utils import map_optional, now
from swh.storage.writer import JournalWriter
@@ -136,6 +137,7 @@
self._hosts, self._keyspace, self._port, self._consistency_level
)
+ @timed
def check_config(self, *, check_write: bool) -> bool:
self._cql_runner.check_read()
@@ -238,6 +240,8 @@
return summary
+ @timed
+ @process_metrics
def content_add(self, content: List[Content]) -> Dict[str, int]:
to_add = {
(c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content
@@ -245,6 +249,7 @@
contents = [attr.evolve(c, ctime=now()) for c in to_add]
return self._content_add(list(contents), with_data=True)
+ @timed
def content_update(
self, contents: List[Dict[str, Any]], keys: List[str] = []
) -> None:
@@ -252,13 +257,17 @@
"content_update is not supported by the Cassandra backend"
)
+ @timed
+ @process_metrics
def content_add_metadata(self, content: List[Content]) -> Dict[str, int]:
return self._content_add(content, with_data=False)
+ @timed
def content_get_data(self, content: Sha1) -> Optional[bytes]:
# FIXME: Make this method support slicing the `data`
return self.objstorage.content_get(content)
+ @timed
def content_get_partition(
self,
partition_id: int,
@@ -300,6 +309,7 @@
assert len(contents) <= limit
return PagedResult(results=contents, next_page_token=next_page_token)
+ @timed
def content_get(
self, contents: List[bytes], algo: str = "sha1"
) -> List[Optional[Content]]:
@@ -320,6 +330,7 @@
contents_by_hash[key(content)] = content
return [contents_by_hash.get(hash_) for hash_ in contents]
+ @timed
def content_find(self, content: Dict[str, Any]) -> List[Content]:
# Find an algorithm that is common to all the requested contents.
# It will be used to do an initial filtering efficiently.
@@ -347,6 +358,7 @@
results.append(Content(**row_d))
return results
+ @timed
def content_missing(
self, contents: List[Dict[str, Any]], key_hash: str = "sha1"
) -> Iterable[bytes]:
@@ -375,9 +387,11 @@
if not res:
yield content[key_hash]
+ @timed
def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]:
return self.content_missing([{"sha1": c} for c in contents])
+ @timed
def content_missing_per_sha1_git(
self, contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
@@ -385,6 +399,7 @@
[{"sha1_git": c} for c in contents], key_hash="sha1_git"
)
+ @timed
def content_get_random(self) -> Sha1Git:
content = self._cql_runner.content_get_random()
assert content, "Could not find any content"
@@ -416,10 +431,13 @@
return {"skipped_content:add": len(contents)}
+ @timed
+ @process_metrics
def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]:
contents = [attr.evolve(c, ctime=now()) for c in content]
return self._skipped_content_add(contents)
+ @timed
def skipped_content_missing(
self, contents: List[Dict[str, Any]]
) -> Iterable[Dict[str, Any]]:
@@ -427,6 +445,8 @@
if not self._cql_runner.skipped_content_get_from_pk(content):
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS}
+ @timed
+ @process_metrics
def directory_add(self, directories: List[Directory]) -> Dict[str, int]:
to_add = {d.id: d for d in directories}.values()
if not self._allow_overwrite:
@@ -450,6 +470,7 @@
return {"directory:add": len(directories)}
+ @timed
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.directory_missing(directories)
@@ -504,6 +525,7 @@
ret["target"], True, prefix + ret["name"] + b"/"
)
+ @timed
def directory_entry_get_by_path(
self, directory: Sha1Git, paths: List[bytes]
) -> Optional[Dict[str, Any]]:
@@ -543,11 +565,13 @@
first_item["target"], paths[1:], prefix + paths[0] + b"/"
)
+ @timed
def directory_ls(
self, directory: Sha1Git, recursive: bool = False
) -> Iterable[Dict[str, Any]]:
yield from self._directory_ls(directory, recursive)
+ @timed
def directory_get_entries(
self,
directory_id: Sha1Git,
@@ -572,11 +596,14 @@
next_page_token = None
return PagedResult(results=entries, next_page_token=next_page_token)
+ @timed
def directory_get_random(self) -> Sha1Git:
directory = self._cql_runner.directory_get_random()
assert directory, "Could not find any directory"
return directory.id
+ @timed
+ @process_metrics
def revision_add(self, revisions: List[Revision]) -> Dict[str, int]:
# Filter-out revisions already in the database
if not self._allow_overwrite:
@@ -604,9 +631,11 @@
return {"revision:add": len(revisions)}
+ @timed
def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.revision_missing(revisions)
+ @timed
def revision_get(self, revision_ids: List[Sha1Git]) -> List[Optional[Revision]]:
rows = self._cql_runner.revision_get(revision_ids)
revisions: Dict[Sha1Git, Revision] = {}
@@ -673,23 +702,28 @@
yield rev.to_dict()
yield from self._get_parent_revs(parents, seen, limit, short)
+ @timed
def revision_log(
self, revisions: List[Sha1Git], limit: Optional[int] = None
) -> Iterable[Optional[Dict[str, Any]]]:
seen: Set[Sha1Git] = set()
yield from self._get_parent_revs(revisions, seen, limit, False)
+ @timed
def revision_shortlog(
self, revisions: List[Sha1Git], limit: Optional[int] = None
) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]:
seen: Set[Sha1Git] = set()
yield from self._get_parent_revs(revisions, seen, limit, True)
+ @timed
def revision_get_random(self) -> Sha1Git:
revision = self._cql_runner.revision_get_random()
assert revision, "Could not find any revision"
return revision.id
+ @timed
+ @process_metrics
def release_add(self, releases: List[Release]) -> Dict[str, int]:
if not self._allow_overwrite:
to_add = {r.id: r for r in releases}.values()
@@ -703,9 +737,11 @@
return {"release:add": len(releases)}
+ @timed
def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.release_missing(releases)
+ @timed
def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]:
rows = self._cql_runner.release_get(releases)
rels: Dict[Sha1Git, Release] = {}
@@ -715,11 +751,14 @@
return [rels.get(rel_id) for rel_id in releases]
+ @timed
def release_get_random(self) -> Sha1Git:
release = self._cql_runner.release_get_random()
assert release, "Could not find any release"
return release.id
+ @timed
+ @process_metrics
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]:
if not self._allow_overwrite:
to_add = {s.id: s for s in snapshots}.values()
@@ -753,9 +792,11 @@
return {"snapshot:add": len(snapshots)}
+ @timed
def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]:
return self._cql_runner.snapshot_missing(snapshots)
+ @timed
def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]:
d = self.snapshot_get_branches(snapshot_id)
if d is None:
@@ -769,6 +810,7 @@
"next_branch": d["next_branch"],
}
+ @timed
def snapshot_count_branches(
self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None,
) -> Optional[Dict[Optional[str], int]]:
@@ -781,6 +823,7 @@
snapshot_id, branch_name_exclude_prefix
)
+ @timed
def snapshot_get_branches(
self,
snapshot_id: Sha1Git,
@@ -856,11 +899,13 @@
next_branch=last_branch,
)
+ @timed
def snapshot_get_random(self) -> Sha1Git:
snapshot = self._cql_runner.snapshot_get_random()
assert snapshot, "Could not find any snapshot"
return snapshot.id
+ @timed
def object_find_by_sha1_git(self, ids: List[Sha1Git]) -> Dict[Sha1Git, List[Dict]]:
results: Dict[Sha1Git, List[Dict]] = {id_: [] for id_ in ids}
missing_ids = set(ids)
@@ -888,9 +933,11 @@
return results
+ @timed
def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]:
return [self.origin_get_one(origin) for origin in origins]
+ @timed
def origin_get_one(self, origin_url: str) -> Optional[Origin]:
"""Given an origin url, return the origin if it exists, None otherwise
@@ -902,6 +949,7 @@
else:
return None
+ @timed
def origin_get_by_sha1(self, sha1s: List[bytes]) -> List[Optional[Dict[str, Any]]]:
results = []
for sha1 in sha1s:
@@ -910,6 +958,7 @@
results.append(origin)
return results
+ @timed
def origin_list(
self, page_token: Optional[str] = None, limit: int = 100
) -> PagedResult[Origin]:
@@ -938,6 +987,7 @@
return PagedResult(results=origins, next_page_token=next_page_token)
+ @timed
def origin_search(
self,
url_pattern: str,
@@ -985,6 +1035,7 @@
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
+ @timed
def origin_count(
self, url_pattern: str, regexp: bool = False, with_visit: bool = False
) -> int:
@@ -992,6 +1043,8 @@
"The Cassandra backend does not implement origin_count"
)
+ @timed
+ @process_metrics
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
if not self._allow_overwrite:
to_add = {o.url: o for o in origins}.values()
@@ -1004,6 +1057,7 @@
)
return {"origin:add": len(origins)}
+ @timed
def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
for visit in visits:
origin = self.origin_get_one(visit.origin)
@@ -1037,7 +1091,7 @@
snapshot=None,
)
)
-
+ send_metric("origin_visit:add", count=nb_visits, method_name="origin_visit")
return all_visits
def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None:
@@ -1058,6 +1112,8 @@
converters.visit_status_to_row(visit_status)
)
+ @timed
+ @process_metrics
def origin_visit_status_add(
self, visit_statuses: List[OriginVisitStatus]
) -> Dict[str, int]:
@@ -1111,6 +1167,7 @@
"date": visit.date.replace(tzinfo=datetime.timezone.utc),
}
+ @timed
def origin_visit_get(
self,
origin: str,
@@ -1139,6 +1196,7 @@
return PagedResult(results=visits, next_page_token=next_page_token)
+ @timed
def origin_visit_status_get(
self,
origin: str,
@@ -1165,6 +1223,7 @@
return PagedResult(results=visit_statuses, next_page_token=next_page_token)
+ @timed
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
) -> Optional[OriginVisit]:
@@ -1181,12 +1240,14 @@
return converters.row_to_visit(min(rows, key=key))
return None
+ @timed
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]:
row = self._cql_runner.origin_visit_get_one(origin, visit)
if row:
return converters.row_to_visit(row)
return None
+ @timed
def origin_visit_get_latest(
self,
origin: str,
@@ -1234,6 +1295,7 @@
type=latest_visit["type"],
)
+ @timed
def origin_visit_status_get_latest(
self,
origin_url: str,
@@ -1256,6 +1318,7 @@
return None
return converters.row_to_visit_status(rows[0])
+ @timed
def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]:
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back
@@ -1271,6 +1334,7 @@
return visit_status
return None
+ @timed
def stat_counters(self):
rows = self._cql_runner.stat_counters()
keys = (
@@ -1287,9 +1351,12 @@
stats.update({row.object_type: row.count for row in rows})
return stats
+ @timed
def refresh_stat_counters(self):
pass
+ @timed
+ @process_metrics
def raw_extrinsic_metadata_add(
self, metadata: List[RawExtrinsicMetadata]
) -> Dict[str, int]:
@@ -1349,6 +1416,7 @@
f"{type.value}_metadata:add": count for (type, count) in counter.items()
}
+ @timed
def raw_extrinsic_metadata_get(
self,
target: ExtendedSWHID,
@@ -1396,6 +1464,7 @@
return PagedResult(next_page_token=next_page_token, results=results,)
+ @timed
def raw_extrinsic_metadata_get_by_ids(
self, ids: List[Sha1Git]
) -> List[RawExtrinsicMetadata]:
@@ -1418,6 +1487,7 @@
return list(results)
+ @timed
def raw_extrinsic_metadata_get_authorities(
self, target: ExtendedSWHID
) -> List[MetadataAuthority]:
@@ -1430,6 +1500,8 @@
)
]
+ @timed
+ @process_metrics
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]:
self.journal_writer.metadata_fetcher_add(fetchers)
for fetcher in fetchers:
@@ -1438,6 +1510,7 @@
)
return {"metadata_fetcher:add": len(fetchers)}
+ @timed
def metadata_fetcher_get(
self, name: str, version: str
) -> Optional[MetadataFetcher]:
@@ -1447,6 +1520,8 @@
else:
return None
+ @timed
+ @process_metrics
def metadata_authority_add(
self, authorities: List[MetadataAuthority]
) -> Dict[str, int]:
@@ -1457,6 +1532,7 @@
)
return {"metadata_authority:add": len(authorities)}
+ @timed
def metadata_authority_get(
self, type: MetadataAuthorityType, url: str
) -> Optional[MetadataAuthority]:
@@ -1469,6 +1545,8 @@
return None
# ExtID tables
+ @timed
+ @process_metrics
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]:
if not self._allow_overwrite:
extids = [
@@ -1506,6 +1584,7 @@
inserted += 1
return {"extid:add": inserted}
+ @timed
def extid_get_from_extid(self, id_type: str, ids: List[bytes]) -> List[ExtID]:
result: List[ExtID] = []
for extid in ids:
@@ -1523,6 +1602,7 @@
)
return result
+ @timed
def extid_get_from_target(
self, target_type: SwhidObjectType, ids: List[Sha1Git]
) -> List[ExtID]:

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 7:03 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216364

Event Timeline