Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from typing import Any, Callable, Dict, Iterable, List | from typing import Any, Callable, Container, Dict, Iterable, List | ||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | while True: | ||||
else: | else: | ||||
# Successfully added contents, we are done | # Successfully added contents, we are done | ||||
break | break | ||||
if colliding_content_hashes: | if colliding_content_hashes: | ||||
for collision in colliding_content_hashes: | for collision in colliding_content_hashes: | ||||
logger.error("Collision detected: %(collision)s", {"collision": collision}) | logger.error("Collision detected: %(collision)s", {"collision": collision}) | ||||
def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict: | |||||
"""Returns a copy of the dict d without any key listed in keys_to_drop""" | |||||
return {k: v for (k, v) in d.items() if k not in keys_to_drop} | |||||
def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: | ||||
"""Insert objects of type object_type in the storage. | """Insert objects of type object_type in the storage. | ||||
""" | """ | ||||
objects = fix_objects(object_type, objects) | objects = fix_objects(object_type, objects) | ||||
if object_type == "content": | if object_type == "content": | ||||
# for bw compat, skipped content should now be delivered in the skipped_content | # for bw compat, skipped content should now be delivered in the skipped_content | ||||
Show All 27 Lines | elif object_type in ("origin_visit", "origin_visit_status"): | ||||
method(model_objs) | method(model_objs) | ||||
elif object_type == "raw_extrinsic_metadata": | elif object_type == "raw_extrinsic_metadata": | ||||
converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] | converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] | ||||
authorities = {emd.authority for emd in converted} | authorities = {emd.authority for emd in converted} | ||||
fetchers = {emd.fetcher for emd in converted} | fetchers = {emd.fetcher for emd in converted} | ||||
storage.metadata_authority_add(authorities) | storage.metadata_authority_add(authorities) | ||||
storage.metadata_fetcher_add(fetchers) | storage.metadata_fetcher_add(fetchers) | ||||
storage.raw_extrinsic_metadata_add(converted) | storage.raw_extrinsic_metadata_add(converted) | ||||
elif object_type == "revision": | |||||
# drop the metadata field from the revision (is any); this field is | |||||
# about to be dropped from the data model (in favor of | |||||
# raw_extrinsic_metadata) and there can be bogus values in the existing | |||||
# journal (metadata with \0000 in it) | |||||
method = getattr(storage, object_type + "_add") | |||||
method( | |||||
[ | |||||
object_converter_fn[object_type](dict_key_dropper(o, ("metadata",))) | |||||
for o in objects | |||||
] | |||||
) | |||||
elif object_type in ( | elif object_type in ( | ||||
"directory", | "directory", | ||||
"extid", | "extid", | ||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"origin", | "origin", | ||||
"metadata_fetcher", | "metadata_fetcher", | ||||
"metadata_authority", | "metadata_authority", | ||||
): | ): | ||||
method = getattr(storage, object_type + "_add") | method = getattr(storage, object_type + "_add") | ||||
method([object_converter_fn[object_type](o) for o in objects]) | method([object_converter_fn[object_type](o) for o in objects]) | ||||
else: | else: | ||||
logger.warning("Received a series of %s, this should not happen", object_type) | logger.warning("Received a series of %s, this should not happen", object_type) |