Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/replay.py
Show All 13 Lines | |||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.storage.fixer import fix_objects | from swh.storage.fixer import fix_objects | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
BaseModel, | BaseModel, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
MetadataAuthority, | |||||
MetadataFetcher, | |||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | |||||
Release, | |||||
Revision, | Revision, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
Release, | |||||
) | ) | ||||
from swh.storage.exc import HashCollision | from swh.storage.exc import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | ||||
"origin": Origin.from_dict, | "origin": Origin.from_dict, | ||||
"origin_visit": OriginVisit.from_dict, | "origin_visit": OriginVisit.from_dict, | ||||
"origin_visit_status": OriginVisitStatus.from_dict, | "origin_visit_status": OriginVisitStatus.from_dict, | ||||
"snapshot": Snapshot.from_dict, | "snapshot": Snapshot.from_dict, | ||||
"revision": Revision.from_dict, | "revision": Revision.from_dict, | ||||
"release": Release.from_dict, | "release": Release.from_dict, | ||||
"directory": Directory.from_dict, | "directory": Directory.from_dict, | ||||
"content": Content.from_dict, | "content": Content.from_dict, | ||||
"skipped_content": SkippedContent.from_dict, | "skipped_content": SkippedContent.from_dict, | ||||
"metadata_authority": MetadataAuthority.from_dict, | |||||
"metadata_fetcher": MetadataFetcher.from_dict, | |||||
"raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | |||||
} | } | ||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
▲ Show 20 Lines • Show All 80 Lines • Show Last 20 Lines |