Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/base.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable | import collections | ||||
from typing import Dict, Iterable | |||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def eventful(self) -> bool: | ||||
"""Whether the load was eventful""" | """Whether the load was eventful""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def store_data(self) -> None: | def store_data(self) -> None: | ||||
assert self.origin | assert self.origin | ||||
if self.save_data_path: | if self.save_data_path: | ||||
self.save_data() | self.save_data() | ||||
counts: Dict[str, int] = collections.defaultdict(int) | |||||
storage_summary: Dict[str, int] = collections.Counter() | |||||
if self.has_contents(): | if self.has_contents(): | ||||
for obj in self.get_contents(): | for obj in self.get_contents(): | ||||
if isinstance(obj, Content): | if isinstance(obj, Content): | ||||
self.storage.content_add([obj]) | counts["content"] += 1 | ||||
storage_summary.update(self.storage.content_add([obj])) | |||||
elif isinstance(obj, SkippedContent): | elif isinstance(obj, SkippedContent): | ||||
self.storage.skipped_content_add([obj]) | counts["skipped_content"] += 1 | ||||
storage_summary.update(self.storage.skipped_content_add([obj])) | |||||
else: | else: | ||||
raise TypeError(f"Unexpected content type: {obj}") | raise TypeError(f"Unexpected content type: {obj}") | ||||
if self.has_directories(): | if self.has_directories(): | ||||
for directory in self.get_directories(): | for directory in self.get_directories(): | ||||
self.storage.directory_add([directory]) | counts["directory"] += 1 | ||||
storage_summary.update(self.storage.directory_add([directory])) | |||||
if self.has_revisions(): | if self.has_revisions(): | ||||
for revision in self.get_revisions(): | for revision in self.get_revisions(): | ||||
self.storage.revision_add([revision]) | counts["revision"] += 1 | ||||
storage_summary.update(self.storage.revision_add([revision])) | |||||
if self.has_releases(): | if self.has_releases(): | ||||
for release in self.get_releases(): | for release in self.get_releases(): | ||||
self.storage.release_add([release]) | counts["release"] += 1 | ||||
storage_summary.update(self.storage.release_add([release])) | |||||
snapshot = self.get_snapshot() | snapshot = self.get_snapshot() | ||||
self.storage.snapshot_add([snapshot]) | counts["snapshot"] += 1 | ||||
self.flush() | storage_summary.update(self.storage.snapshot_add([snapshot])) | ||||
storage_summary.update(self.flush()) | |||||
self.loaded_snapshot_id = snapshot.id | self.loaded_snapshot_id = snapshot.id | ||||
for (object_type, total) in counts.items(): | |||||
filtered = total - storage_summary[f"{object_type}:add"] | |||||
assert 0 <= filtered <= total, (filtered, total) | |||||
if total == 0: | |||||
# No need to send it | |||||
continue | |||||
# cannot use self.statsd_average, because this is a weighted average | |||||
tags = {"object_type": object_type} | |||||
self.statsd.increment("filtered_objects_percent_sum", filtered, tags=tags) | |||||
self.statsd.increment("filtered_objects_percent_count", total, tags=tags) |