diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -90,7 +90,7 @@ raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, contents: Sequence[Content]) -> Dict: + def content_add(self, contents: Sequence[Content]) -> Dict[str, int]: """Push contents to write to the storage in the buffer. Following policies apply: @@ -112,7 +112,7 @@ return stats - def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict: + def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: return self.object_add( contents, object_type="skipped_content", diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -12,6 +12,7 @@ from typing import ( Any, Callable, + Counter, Dict, Iterable, List, @@ -27,7 +28,7 @@ from swh.core.api.classes import stream_results from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS -from swh.model.identifiers import CoreSWHID, ExtendedSWHID +from swh.model.identifiers import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.model.identifiers import ObjectType as SwhidObjectType from swh.model.model import ( Content, @@ -117,7 +118,7 @@ if getattr(row, algo) == hash_: yield row - def _content_add(self, contents: List[Content], with_data: bool) -> Dict: + def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]: # Filter-out content already in the database. contents = [ c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) @@ -192,7 +193,7 @@ return summary - def content_add(self, content: List[Content]) -> Dict: + def content_add(self, content: List[Content]) -> Dict[str, int]: to_add = { (c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content }.values() @@ -206,7 +207,7 @@ "content_update is not supported by the Cassandra backend" ) - def content_add_metadata(self, content: List[Content]) -> Dict: + def content_add_metadata(self, content: List[Content]) -> Dict[str, int]: return self._content_add(content, with_data=False) def content_get_data(self, content: Sha1) -> Optional[bytes]: @@ -321,7 +322,7 @@ assert content, "Could not find any content" return content.sha1_git - def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: + def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict[str, int]: # Filter-out content already in the database. contents = [ c @@ -346,7 +347,7 @@ return {"skipped_content:add": len(contents)} - def skipped_content_add(self, content: List[SkippedContent]) -> Dict: + def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: contents = [attr.evolve(c, ctime=now()) for c in content] return self._skipped_content_add(contents) @@ -357,7 +358,7 @@ if not self._cql_runner.skipped_content_get_from_pk(content): yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} - def directory_add(self, directories: List[Directory]) -> Dict: + def directory_add(self, directories: List[Directory]) -> Dict[str, int]: to_add = {d.id: d for d in directories}.values() # Filter out directories that are already inserted. missing = self.directory_missing([dir_.id for dir_ in to_add]) @@ -482,7 +483,7 @@ assert directory, "Could not find any directory" return directory.id - def revision_add(self, revisions: List[Revision]) -> Dict: + def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: # Filter-out revisions already in the database to_add = {r.id: r for r in revisions}.values() missing = self.revision_missing([rev.id for rev in to_add]) @@ -594,7 +595,7 @@ assert revision, "Could not find any revision" return revision.id - def release_add(self, releases: List[Release]) -> Dict: + def release_add(self, releases: List[Release]) -> Dict[str, int]: to_add = {r.id: r for r in releases}.values() missing = set(self.release_missing([rel.id for rel in to_add])) releases = [rel for rel in to_add if rel.id in missing] @@ -623,7 +624,7 @@ assert release, "Could not find any release" return release.id - def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: + def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: to_add = {s.id: s for s in snapshots}.values() missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) snapshots = [snp for snp in snapshots if snp.id in missing] @@ -955,7 +956,9 @@ converters.visit_status_to_row(visit_status) ) - def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus]) -> None: + def origin_visit_status_add( + self, visit_statuses: List[OriginVisitStatus] + ) -> Dict[str, int]: # First round to check existence (fail early if any is ko) for visit_status in visit_statuses: origin_url = self.origin_get_one(visit_status.origin) @@ -964,6 +967,7 @@ for visit_status in visit_statuses: self._origin_visit_status_add(visit_status) + return {"origin_visit_status:add": len(visit_statuses)} def _origin_visit_apply_status( self, visit: Dict[str, Any], visit_status: OriginVisitStatusRow @@ -1183,8 +1187,11 @@ def refresh_stat_counters(self): pass - def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata]) -> None: + def raw_extrinsic_metadata_add( + self, metadata: List[RawExtrinsicMetadata] + ) -> Dict[str, int]: self.journal_writer.raw_extrinsic_metadata_add(metadata) + counter = Counter[ExtendedObjectType]() for metadata_entry in metadata: if not self._cql_runner.metadata_authority_get( metadata_entry.authority.type.value, metadata_entry.authority.url @@ -1220,8 +1227,12 @@ directory=map_optional(str, metadata_entry.directory), ) self._cql_runner.raw_extrinsic_metadata_add(row) + counter[metadata_entry.target.object_type] += 1 except TypeError as e: raise StorageArgumentException(*e.args) + return { + f"{type.value}_metadata:add": count for (type, count) in counter.items() + } def raw_extrinsic_metadata_get( self, @@ -1293,7 +1304,7 @@ return PagedResult(next_page_token=next_page_token, results=results,) - def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: + def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> Dict[str, int]: self.journal_writer.metadata_fetcher_add(fetchers) for fetcher in fetchers: self._cql_runner.metadata_fetcher_add( @@ -1303,6 +1314,7 @@ metadata=json.dumps(map_optional(dict, fetcher.metadata)), ) ) + return {"metadata_fetcher:add": len(fetchers)} def metadata_fetcher_get( self, name: str, version: str @@ -1317,7 +1329,9 @@ else: return None - def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: + def metadata_authority_add( + self, authorities: List[MetadataAuthority] + ) -> Dict[str, int]: self.journal_writer.metadata_authority_add(authorities) for authority in authorities: self._cql_runner.metadata_authority_add( @@ -1327,6 +1341,7 @@ metadata=json.dumps(map_optional(dict, authority.metadata)), ) ) + return {"metadata_authority:add": len(authorities)} def metadata_authority_get( self, type: MetadataAuthorityType, url: str diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -37,25 +37,25 @@ raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, content: List[Content]) -> Dict: + def content_add(self, content: List[Content]) -> Dict[str, int]: contents_to_add = self._filter_missing_contents(content) return self.storage.content_add( [x for x in content if x.sha256 in contents_to_add] ) - def skipped_content_add(self, content: List[SkippedContent]) -> Dict: + def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: contents_to_add = self._filter_missing_skipped_contents(content) return self.storage.skipped_content_add( [x for x in content if x.sha1_git is None or x.sha1_git in contents_to_add] ) - def directory_add(self, directories: List[Directory]) -> Dict: + def directory_add(self, directories: List[Directory]) -> Dict[str, int]: missing_ids = self._filter_missing_ids("directory", (d.id for d in directories)) return self.storage.directory_add( [d for d in directories if d.id in missing_ids] ) - def revision_add(self, revisions: List[Revision]) -> Dict: + def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: missing_ids = self._filter_missing_ids("revision", (r.id for r in revisions)) return self.storage.revision_add([r for r in revisions if r.id in missing_ids]) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -74,7 +74,7 @@ ... @remote_api_endpoint("content/add") - def content_add(self, content: List[Content]) -> Dict: + def content_add(self, content: List[Content]) -> Dict[str, int]: """Add content blobs to the storage Args: @@ -134,7 +134,7 @@ ... @remote_api_endpoint("content/add_metadata") - def content_add_metadata(self, content: List[Content]) -> Dict: + def content_add_metadata(self, content: List[Content]) -> Dict[str, int]: """Add content metadata to the storage (like `content_add`, but without inserting to the objstorage). @@ -297,7 +297,7 @@ ... @remote_api_endpoint("content/skipped/add") - def skipped_content_add(self, content: List[SkippedContent]) -> Dict: + def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: """Add contents to the skipped_content list, which contains (partial) information about content missing from the archive. @@ -350,7 +350,7 @@ ... @remote_api_endpoint("directory/add") - def directory_add(self, directories: List[Directory]) -> Dict: + def directory_add(self, directories: List[Directory]) -> Dict[str, int]: """Add directories to the storage Args: @@ -436,7 +436,7 @@ ... @remote_api_endpoint("revision/add") - def revision_add(self, revisions: List[Revision]) -> Dict: + def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: """Add revisions to the storage Args: @@ -587,7 +587,7 @@ ... @remote_api_endpoint("release/add") - def release_add(self, releases: List[Release]) -> Dict: + def release_add(self, releases: List[Release]) -> Dict[str, int]: """Add releases to the storage Args: @@ -652,7 +652,7 @@ ... @remote_api_endpoint("snapshot/add") - def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: + def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: """Add snapshots to the storage. Args: @@ -810,7 +810,9 @@ ... @remote_api_endpoint("origin/visit_status/add") - def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus],) -> None: + def origin_visit_status_add( + self, visit_statuses: List[OriginVisitStatus], + ) -> Dict[str, int]: """Add origin visit statuses. If there is already a status for the same origin and visit id at the same @@ -1132,7 +1134,9 @@ ... @remote_api_endpoint("raw_extrinsic_metadata/add") - def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: + def raw_extrinsic_metadata_add( + self, metadata: List[RawExtrinsicMetadata], + ) -> Dict[str, int]: """Add extrinsic metadata on objects (contents, directories, ...). The authority and fetcher must be known to the storage before @@ -1173,7 +1177,7 @@ ... @remote_api_endpoint("metadata_fetcher/add") - def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None: + def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> Dict[str, int]: """Add new metadata fetchers to the storage. Their `name` and `version` together are unique identifiers of this @@ -1205,7 +1209,9 @@ ... @remote_api_endpoint("metadata_authority/add") - def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: + def metadata_authority_add( + self, authorities: List[MetadataAuthority] + ) -> Dict[str, int]: """Add new metadata authorities to the storage. Their `type` and `url` together are unique identifiers of this diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -210,7 +210,7 @@ @timed @process_metrics - def content_add(self, content: List[Content]) -> Dict: + def content_add(self, content: List[Content]) -> Dict[str, int]: ctime = now() contents = [attr.evolve(c, ctime=ctime) for c in content] @@ -260,7 +260,9 @@ @timed @process_metrics @db_transaction() - def content_add_metadata(self, content: List[Content], db=None, cur=None) -> Dict: + def content_add_metadata( + self, content: List[Content], db=None, cur=None + ) -> Dict[str, int]: missing = self.content_missing( (c.to_dict() for c in content), key_hash="sha1_git", db=db, cur=cur, ) @@ -419,7 +421,7 @@ @db_transaction() def skipped_content_add( self, content: List[SkippedContent], db=None, cur=None - ) -> Dict: + ) -> Dict[str, int]: ctime = now() content = [attr.evolve(c, ctime=ctime) for c in content] @@ -457,7 +459,9 @@ @timed @process_metrics @db_transaction() - def directory_add(self, directories: List[Directory], db=None, cur=None) -> Dict: + def directory_add( + self, directories: List[Directory], db=None, cur=None + ) -> Dict[str, int]: summary = {"directory:add": 0} dirs = set() @@ -548,7 +552,9 @@ @timed @process_metrics @db_transaction() - def revision_add(self, revisions: List[Revision], db=None, cur=None) -> Dict: + def revision_add( + self, revisions: List[Revision], db=None, cur=None + ) -> Dict[str, int]: summary = {"revision:add": 0} revisions_missing = set( @@ -687,7 +693,7 @@ @timed @process_metrics @db_transaction() - def release_add(self, releases: List[Release], db=None, cur=None) -> Dict: + def release_add(self, releases: List[Release], db=None, cur=None) -> Dict[str, int]: summary = {"release:add": 0} release_ids = set(release.id for release in releases) @@ -743,7 +749,9 @@ @timed @process_metrics @db_transaction() - def snapshot_add(self, snapshots: List[Snapshot], db=None, cur=None) -> Dict: + def snapshot_add( + self, snapshots: List[Snapshot], db=None, cur=None + ) -> Dict[str, int]: created_temp_table = False count = 0 @@ -929,15 +937,13 @@ """Add an origin visit status""" self.journal_writer.origin_visit_status_add([visit_status]) db.origin_visit_status_add(visit_status, cur=cur) - send_metric( - "origin_visit_status:add", count=1, method_name="origin_visit_status" - ) @timed + @process_metrics @db_transaction() def origin_visit_status_add( self, visit_statuses: List[OriginVisitStatus], db=None, cur=None, - ) -> None: + ) -> Dict[str, int]: visit_statuses_ = [] # First round to check existence (fail early if any is ko) @@ -964,6 +970,7 @@ for visit_status in visit_statuses_: self._origin_visit_status_add(visit_status, db, cur) + return {"origin_visit_status:add": len(visit_statuses_)} @timed @db_transaction() @@ -1300,10 +1307,12 @@ for key in keys: cur.execute("select * from swh_update_counter(%s)", (key,)) + @timed + @process_metrics @db_transaction() def raw_extrinsic_metadata_add( self, metadata: List[RawExtrinsicMetadata], db, cur, - ) -> None: + ) -> Dict[str, int]: metadata = list(metadata) self.journal_writer.raw_extrinsic_metadata_add(metadata) counter = Counter[ExtendedObjectType]() @@ -1331,12 +1340,9 @@ ) counter[metadata_entry.target.object_type] += 1 - for (type, count) in counter.items(): - send_metric( - f"{type.value}_metadata:add", - count=count, - method_name=f"{type.name.lower()}_metadata_add", - ) + return { + f"{type.value}_metadata:add": count for (type, count) in counter.items() + } @db_transaction() def raw_extrinsic_metadata_get( @@ -1390,10 +1396,11 @@ return PagedResult(next_page_token=next_page_token, results=results,) @timed + @process_metrics @db_transaction() def metadata_fetcher_add( self, fetchers: List[MetadataFetcher], db=None, cur=None - ) -> None: + ) -> Dict[str, int]: fetchers = list(fetchers) self.journal_writer.metadata_fetcher_add(fetchers) count = 0 @@ -1406,7 +1413,7 @@ fetcher.name, fetcher.version, dict(fetcher.metadata), cur=cur ) count += 1 - send_metric("metadata_fetcher:add", count=count, method_name="metadata_fetcher") + return {"metadata_fetcher:add": count} @timed @db_transaction(statement_timeout=500) @@ -1419,10 +1426,11 @@ return MetadataFetcher.from_dict(dict(zip(db.metadata_fetcher_cols, row))) @timed + @process_metrics @db_transaction() def metadata_authority_add( self, authorities: List[MetadataAuthority], db=None, cur=None - ) -> None: + ) -> Dict[str, int]: authorities = list(authorities) self.journal_writer.metadata_authority_add(authorities) count = 0 @@ -1436,9 +1444,7 @@ authority.type.value, authority.url, dict(authority.metadata), cur=cur ) count += 1 - send_metric( - "metadata_authority:add", count=count, method_name="metadata_authority" - ) + return {"metadata_authority:add": count} @timed @db_transaction() diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -2176,7 +2176,8 @@ snapshot=None, metadata={"intrinsic": "something"}, ) - swh_storage.origin_visit_status_add([visit_status1, visit_status2]) + stats = swh_storage.origin_visit_status_add([visit_status1, visit_status2]) + assert stats == {"origin_visit_status:add": 2} visit = swh_storage.origin_visit_get_latest(origin1.url, require_snapshot=True) visit_status = swh_storage.origin_visit_status_get_latest( @@ -2241,9 +2242,12 @@ snapshot=snapshot.id, ) - swh_storage.origin_visit_status_add([visit_status1]) + stats = swh_storage.origin_visit_status_add([visit_status1]) + assert stats == {"origin_visit_status:add": 1} # second call will ignore existing entries (will send to storage though) - swh_storage.origin_visit_status_add([visit_status1]) + stats = swh_storage.origin_visit_status_add([visit_status1]) + # ...so the storage still returns it as an addition + assert stats == {"origin_visit_status:add": 1} visit_status = swh_storage.origin_visit_status_get_latest(ov1.origin, ov1.visit) assert visit_status == visit_status1 diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -45,7 +45,7 @@ f"but it should be {hash_to_hex(id_)}: {obj}" ) - def content_add(self, content: List[Content]) -> Dict: + def content_add(self, content: List[Content]) -> Dict[str, int]: for cont in content: hashes = MultiHash.from_data(cont.data).digest() if hashes != cont.hashes(): @@ -54,18 +54,18 @@ ) return self.storage.content_add(content) - def directory_add(self, directories: List[Directory]) -> Dict: + def directory_add(self, directories: List[Directory]) -> Dict[str, int]: self._check_hashes(directories) return self.storage.directory_add(directories) - def revision_add(self, revisions: List[Revision]) -> Dict: + def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: self._check_hashes(revisions) return self.storage.revision_add(revisions) - def release_add(self, releases: List[Release]) -> Dict: + def release_add(self, releases: List[Release]) -> Dict[str, int]: self._check_hashes(releases) return self.storage.release_add(releases) - def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: + def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: self._check_hashes(snapshots) return self.storage.snapshot_add(snapshots)