Changeset View
Standalone View
swh/provenance/storage/replay.py
- This file was added.
# Copyright (C) 2022 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import logging | |||||
from typing import Any, Callable, Dict, List, Optional | |||||
try: | |||||
from systemd.daemon import notify | |||||
except ImportError: | |||||
notify = None | |||||
from swh.core.statsd import statsd | |||||
from swh.journal.serializers import kafka_to_value | |||||
from swh.provenance.storage.interface import ( | |||||
DirectoryData, | |||||
RelationData, | |||||
RelationType, | |||||
RevisionData, | |||||
) | |||||
from .interface import ProvenanceStorageInterface | |||||
logger = logging.getLogger(__name__) | |||||
REPLAY_OPERATIONS_METRIC = "swh_provenance_replayer_operations_total" | |||||
REPLAY_DURATION_METRIC = "swh_provenance_replayer_duration_seconds" | |||||
def cvrt_directory(msg_d): | |||||
return {msg_d["id"]: DirectoryData(**msg_d["value"])} | |||||
def cvrt_revision(msg_d): | |||||
return {msg_d["id"]: RevisionData(**msg_d["value"])} | |||||
def cvrt_default(msg_d): | |||||
return {msg_d["id"]: msg_d["value"]} | |||||
def cvrt_relation(msg_d): | |||||
return {msg_d["id"]: {RelationData(**v) for v in msg_d["value"]}} | |||||
OBJECT_CONVERTERS: Dict[str, Callable[[Dict], Dict]] = { | |||||
"directory": cvrt_directory, | |||||
"revision": cvrt_revision, | |||||
"content": cvrt_default, | |||||
"location": cvrt_default, | |||||
"content_in_revision": cvrt_relation, | |||||
"content_in_directory": cvrt_relation, | |||||
"directory_in_revision": cvrt_relation, | |||||
} | |||||
class ProvenanceObjectDeserializer: | |||||
def __init__( | |||||
self, | |||||
raise_on_error: bool = False, | |||||
reporter: Optional[Callable[[str, bytes], None]] = None, | |||||
): | |||||
self.reporter = reporter | |||||
self.raise_on_error = raise_on_error | |||||
def convert(self, object_type: str, msg: bytes) -> Optional[Dict]: | |||||
dict_repr = kafka_to_value(msg) | |||||
obj = OBJECT_CONVERTERS[object_type](dict_repr) | |||||
return obj | |||||
def report_failure(self, msg: bytes, obj: Dict): | |||||
if self.reporter: | |||||
self.reporter(obj["id"].hex(), msg) | |||||
def process_replay_objects( | |||||
all_objects: Dict[str, List[Dict]], *, storage: ProvenanceStorageInterface | |||||
) -> None: | |||||
# we do handle objects before relations because relations need objects to | |||||
# already exists beforehand; which is probably very naive in real life | |||||
# scenarios since we won't any guarantee kafka will send us relations and | |||||
# objects topics in the "correct" order; works for tests though... | |||||
obj_types = [ | |||||
"content", | |||||
"directory", | |||||
"revision", | |||||
"location", | |||||
"content_in_revision", | |||||
"content_in_directory", | |||||
"directory_in_revision", | |||||
] | |||||
for object_type in obj_types: | |||||
objects = all_objects[object_type] | |||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | |||||
with statsd.timed(REPLAY_DURATION_METRIC, tags={"object_type": object_type}): | |||||
_insert_objects(object_type, objects, storage) | |||||
statsd.increment( | |||||
REPLAY_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} | |||||
) | |||||
if notify: | |||||
notify("WATCHDOG=1") | |||||
def _insert_objects( | |||||
object_type: str, objects: List[Any], storage: ProvenanceStorageInterface | |||||
) -> None: | |||||
"""Insert objects of type object_type in the storage.""" | |||||
if object_type not in OBJECT_CONVERTERS: | |||||
logger.warning("Received a series of %s, this should not happen", object_type) | |||||
return | |||||
if "_in_" in object_type: | |||||
method = lambda data: storage.relation_add( # noqa: E731 | |||||
relation=RelationType(object_type), data=data | |||||
) | |||||
else: | |||||
method = getattr(storage, f"{object_type}_add") | |||||
data = dict(next(iter(obj.items())) for obj in objects) | |||||
method(data) | |||||
vlorentz: `method` isn't needed.
Also, what's this `dict(next())` thing? Doesn't it discard all but one… | |||||
douarddaAuthorUnsubmitted Done Inline Actionsgeez you're right, it's much better that way... the dict(next()) is because at this point we get a list of 1-element dict like [{id1: val1}, {id2: val2}. ...] which is converted as a simple dict {id1: val1, id2: val2, ...} All this is not very pretty, I know... douardda: geez you're right, it's much better that way...
the `dict(next())` is because at this point… | |||||
vlorentzUnsubmitted Not Done Inline Actionswhere do this singleton dicts come from? swh-journal? vlorentz: where do this singleton dicts come from? swh-journal? | |||||
douarddaAuthorUnsubmitted Done Inline ActionsI'd say it's related with the way I put objects in kafka where I do not "use" the message key so this later is repeated in the message payload (dict), which probably is not necessary... (not sure to remember why I did this that way; I suspect the reason I chose to do that vanished at some point in a later refactoring but I did not notice). douardda: I'd say it's related with the way I put objects in kafka where I do not "use" the message key… | |||||
douarddaAuthorUnsubmitted Done Inline Actionsah no, the way swh-journal currently works make it hard to access the kafka message key, so I had to embed it in the msg.value. douardda: ah no, the way swh-journal currently works make it hard to access the kafka message key, so I… |
method isn't needed.
Also, what's this dict(next()) thing? Doesn't it discard all but one object type at random?