Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/replay.py
Show All 11 Lines | except ImportError: | ||||
notify = None | notify = None | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
BaseModel, | BaseModel, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
ExtID, | |||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
Show All 17 Lines | object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | ||||
"revision": Revision.from_dict, | "revision": Revision.from_dict, | ||||
"release": Release.from_dict, | "release": Release.from_dict, | ||||
"directory": Directory.from_dict, | "directory": Directory.from_dict, | ||||
"content": Content.from_dict, | "content": Content.from_dict, | ||||
"skipped_content": SkippedContent.from_dict, | "skipped_content": SkippedContent.from_dict, | ||||
"metadata_authority": MetadataAuthority.from_dict, | "metadata_authority": MetadataAuthority.from_dict, | ||||
"metadata_fetcher": MetadataFetcher.from_dict, | "metadata_fetcher": MetadataFetcher.from_dict, | ||||
"raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | "raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | ||||
"extid": ExtID.from_dict, | |||||
} | } | ||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
▲ Show 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | elif object_type == "raw_extrinsic_metadata": | ||||
converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] | converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] | ||||
authorities = {emd.authority for emd in converted} | authorities = {emd.authority for emd in converted} | ||||
fetchers = {emd.fetcher for emd in converted} | fetchers = {emd.fetcher for emd in converted} | ||||
storage.metadata_authority_add(authorities) | storage.metadata_authority_add(authorities) | ||||
storage.metadata_fetcher_add(fetchers) | storage.metadata_fetcher_add(fetchers) | ||||
storage.raw_extrinsic_metadata_add(converted) | storage.raw_extrinsic_metadata_add(converted) | ||||
elif object_type in ( | elif object_type in ( | ||||
"directory", | "directory", | ||||
"extid", | |||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"origin", | "origin", | ||||
"metadata_fetcher", | "metadata_fetcher", | ||||
"metadata_authority", | "metadata_authority", | ||||
): | ): | ||||
method = getattr(storage, object_type + "_add") | method = getattr(storage, object_type + "_add") | ||||
method([object_converter_fn[object_type](o) for o in objects]) | method([object_converter_fn[object_type](o) for o in objects]) | ||||
else: | else: | ||||
logger.warning("Received a series of %s, this should not happen", object_type) | logger.warning("Received a series of %s, this should not happen", object_type) |