Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | |||||
from functools import partial | |||||
import logging | import logging | ||||
from typing import Any, Callable, Container, Dict, List | from typing import Any, Callable | ||||
from typing import Counter as CounterT | |||||
from typing import Dict, List, Optional, TypeVar, Union, cast | |||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.journal.serializers import kafka_to_value | |||||
from swh.model.hashutil import hash_to_hex | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
BaseModel, | BaseModel, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
ExtID, | ExtID, | ||||
HashableObject, | |||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
) | ) | ||||
from swh.storage.exc import HashCollision | from swh.storage.exc import HashCollision, StorageArgumentException | ||||
from swh.storage.fixer import fix_objects | |||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.utils import remove_keys | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | OBJECT_CONVERTERS: Dict[str, Callable[[Dict], BaseModel]] = { | ||||
"origin": Origin.from_dict, | "origin": Origin.from_dict, | ||||
"origin_visit": OriginVisit.from_dict, | "origin_visit": OriginVisit.from_dict, | ||||
"origin_visit_status": OriginVisitStatus.from_dict, | "origin_visit_status": OriginVisitStatus.from_dict, | ||||
"snapshot": Snapshot.from_dict, | "snapshot": Snapshot.from_dict, | ||||
"revision": Revision.from_dict, | "revision": Revision.from_dict, | ||||
"release": Release.from_dict, | "release": Release.from_dict, | ||||
"directory": Directory.from_dict, | "directory": Directory.from_dict, | ||||
"content": Content.from_dict, | "content": Content.from_dict, | ||||
"skipped_content": SkippedContent.from_dict, | "skipped_content": SkippedContent.from_dict, | ||||
"metadata_authority": MetadataAuthority.from_dict, | "metadata_authority": MetadataAuthority.from_dict, | ||||
"metadata_fetcher": MetadataFetcher.from_dict, | "metadata_fetcher": MetadataFetcher.from_dict, | ||||
"raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | "raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | ||||
"extid": ExtID.from_dict, | "extid": ExtID.from_dict, | ||||
} | } | ||||
# Deprecated, for BW compat only. | |||||
object_converter_fn = OBJECT_CONVERTERS | |||||
OBJECT_FIXERS = { | |||||
"revision": partial(remove_keys, keys=("metadata",)), | |||||
} | |||||
class ModelObjectDeserializer: | |||||
"""A swh.journal object deserializer that checks object validity and reports | |||||
invalid objects | |||||
The deserializer will directly produce BaseModel objects from journal | |||||
objects representations. | |||||
If validation is activated and the object is hashable, it will check if the | |||||
computed hash matches the identifier of the object. | |||||
If the object is invalid and a 'reporter' function is given, it will be | |||||
called with 2 arguments:: | |||||
reporter(object_id, journal_msg) | |||||
Where 'object_id' is a string representation of the object identifier (from | |||||
the journal message), and 'journal_msg' is the row message (bytes) | |||||
retrieved from the journal. | |||||
If 'raise_on_error' is True, a 'StorageArgumentException' exception is | |||||
raised. | |||||
Typical usage:: | |||||
deserializer = ModelObjectDeserializer(validate=True, reporter=reporter_cb) | |||||
client = get_journal_client( | |||||
cls="kafka", value_deserializer=deserializer, **cfg) | |||||
""" | |||||
def __init__( | |||||
self, | |||||
validate: bool = True, | |||||
raise_on_error: bool = False, | |||||
reporter: Optional[Callable[[str, bytes], None]] = None, | |||||
): | |||||
self.validate = validate | |||||
self.reporter = reporter | |||||
self.raise_on_error = raise_on_error | |||||
def convert(self, object_type: str, msg: bytes) -> Optional[BaseModel]: | |||||
dict_repr = kafka_to_value(msg) | |||||
if object_type in OBJECT_FIXERS: | |||||
dict_repr = OBJECT_FIXERS[object_type](dict_repr) | |||||
obj = OBJECT_CONVERTERS[object_type](dict_repr) | |||||
if self.validate: | |||||
if isinstance(obj, HashableObject): | |||||
cid = obj.compute_hash() | |||||
if obj.id != cid: | |||||
error_msg = ( | |||||
f"Object has id {hash_to_hex(obj.id)}, " | |||||
f"but it should be {hash_to_hex(cid)}: {obj}" | |||||
) | |||||
logger.error(error_msg) | |||||
self.report_failure(msg, obj) | |||||
if self.raise_on_error: | |||||
raise StorageArgumentException(error_msg) | |||||
return None | |||||
return obj | |||||
def report_failure(self, msg: bytes, obj: BaseModel): | |||||
if self.reporter: | |||||
oid: str = "" | |||||
if hasattr(obj, "swhid"): | |||||
swhid = obj.swhid() # type: ignore[attr-defined] | |||||
oid = str(swhid) | |||||
elif isinstance(obj, HashableObject): | |||||
uid = obj.compute_hash() | |||||
oid = f"{obj.object_type}:{uid.hex()}" # type: ignore[attr-defined] | |||||
if oid: | |||||
self.reporter(oid, msg) | |||||
def process_replay_objects( | def process_replay_objects( | ||||
vlorentz: can you rename `object_converter_fn` and `object_fixers` to be in caps? (it's a little… | |||||
all_objects: Dict[str, List[Dict[str, Any]]], *, storage: StorageInterface | all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface | ||||
Done Inline Actionsuse swh.storage.utils.remove_keys vlorentz: use `swh.storage.utils.remove_keys` | |||||
Done Inline Actionsthx douardda: thx | |||||
) -> None: | ) -> None: | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment( | statsd.increment( | ||||
GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} | GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} | ||||
) | ) | ||||
if notify: | if notify: | ||||
Done Inline Actionscould you add a docstring? in particular, explain the args of reporter vlorentz: could you add a docstring? in particular, explain the args of `reporter` | |||||
notify("WATCHDOG=1") | notify("WATCHDOG=1") | ||||
ContentType = TypeVar("ContentType", bound=BaseContent) | |||||
def collision_aware_content_add( | def collision_aware_content_add( | ||||
content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[BaseContent], | contents: List[ContentType], | ||||
) -> None: | content_add_fn: Callable[[List[ContentType]], Dict[str, int]], | ||||
) -> Dict[str, int]: | |||||
"""Add contents to storage. If a hash collision is detected, an error is | """Add contents to storage. If a hash collision is detected, an error is | ||||
logged. Then this adds the other non colliding contents to the storage. | logged. Then this adds the other non colliding contents to the storage. | ||||
Args: | Args: | ||||
content_add_fn: Storage content callable | content_add_fn: Storage content callable | ||||
contents: List of contents or skipped contents to add to storage | contents: List of contents or skipped contents to add to storage | ||||
""" | """ | ||||
if not contents: | if not contents: | ||||
return | return {} | ||||
colliding_content_hashes: List[Dict[str, Any]] = [] | colliding_content_hashes: List[Dict[str, Any]] = [] | ||||
results: CounterT[str] = Counter() | |||||
while True: | while True: | ||||
try: | try: | ||||
content_add_fn(contents) | results.update(content_add_fn(contents)) | ||||
except HashCollision as e: | except HashCollision as e: | ||||
colliding_content_hashes.append( | colliding_content_hashes.append( | ||||
{ | { | ||||
"algo": e.algo, | "algo": e.algo, | ||||
Done Inline ActionsTIL this comment syntax vlorentz: TIL this comment syntax | |||||
"hash": e.hash_id, # hex hash id | "hash": e.hash_id, # hex hash id | ||||
"objects": e.colliding_contents, # hex hashes | "objects": e.colliding_contents, # hex hashes | ||||
} | } | ||||
) | ) | ||||
Done Inline ActionsDo we have any HashableObject class without a swhid method? vlorentz: Do we have any `HashableObject` class without a `swhid` method? | |||||
Done Inline ActionsExtID douardda: ExtID | |||||
colliding_hashes = e.colliding_content_hashes() | colliding_hashes = e.colliding_content_hashes() | ||||
# Drop the colliding contents from the transaction | # Drop the colliding contents from the transaction | ||||
contents = [c for c in contents if c.hashes() not in colliding_hashes] | contents = [c for c in contents if c.hashes() not in colliding_hashes] | ||||
else: | else: | ||||
# Successfully added contents, we are done | # Successfully added contents, we are done | ||||
break | break | ||||
if colliding_content_hashes: | if colliding_content_hashes: | ||||
for collision in colliding_content_hashes: | for collision in colliding_content_hashes: | ||||
logger.error("Collision detected: %(collision)s", {"collision": collision}) | logger.error("Collision detected: %(collision)s", {"collision": collision}) | ||||
return dict(results) | |||||
Done Inline Actionswhy not use functools.partial instead of returning a callable? vlorentz: why not use `functools.partial` instead of returning a callable? | |||||
Done Inline Actionswould work too yes, why not douardda: would work too yes, why not | |||||
def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict: | |||||
"""Returns a copy of the dict d without any key listed in keys_to_drop""" | |||||
return {k: v for (k, v) in d.items() if k not in keys_to_drop} | |||||
def _insert_objects( | def _insert_objects( | ||||
object_type: str, objects: List[Dict], storage: StorageInterface | object_type: str, objects: List[BaseModel], storage: StorageInterface | ||||
) -> None: | ) -> None: | ||||
"""Insert objects of type object_type in the storage. | """Insert objects of type object_type in the storage. | ||||
""" | """ | ||||
objects = fix_objects(object_type, objects) | if object_type not in OBJECT_CONVERTERS: | ||||
logger.warning("Received a series of %s, this should not happen", object_type) | |||||
return | |||||
if object_type == "content": | method = getattr(storage, f"{object_type}_add") | ||||
# for bw compat, skipped content should now be delivered in the skipped_content | if object_type == "skipped_content": | ||||
# topic | method = partial(collision_aware_content_add, content_add_fn=method) | ||||
contents: List[BaseContent] = [] | elif object_type == "content": | ||||
skipped_contents: List[BaseContent] = [] | method = partial( | ||||
for content in objects: | collision_aware_content_add, content_add_fn=storage.content_add_metadata | ||||
c = BaseContent.from_dict(content) | |||||
if isinstance(c, SkippedContent): | |||||
logger.warning( | |||||
"Received a series of skipped_content in the " | |||||
"content topic, this should not happen anymore" | |||||
) | ) | ||||
skipped_contents.append(c) | |||||
else: | |||||
contents.append(c) | |||||
collision_aware_content_add(storage.skipped_content_add, skipped_contents) | |||||
collision_aware_content_add(storage.content_add_metadata, contents) | |||||
elif object_type == "skipped_content": | |||||
skipped_contents = [SkippedContent.from_dict(obj) for obj in objects] | |||||
collision_aware_content_add(storage.skipped_content_add, skipped_contents) | |||||
elif object_type in ("origin_visit", "origin_visit_status"): | elif object_type in ("origin_visit", "origin_visit_status"): | ||||
origins: List[Origin] = [] | origins: List[Origin] = [] | ||||
converter_fn = object_converter_fn[object_type] | for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects): | ||||
model_objs = [] | origins.append(Origin(url=obj.origin)) | ||||
for obj in objects: | |||||
origins.append(Origin(url=obj["origin"])) | |||||
model_objs.append(converter_fn(obj)) | |||||
storage.origin_add(origins) | storage.origin_add(origins) | ||||
method = getattr(storage, f"{object_type}_add") | |||||
method(model_objs) | |||||
elif object_type == "raw_extrinsic_metadata": | elif object_type == "raw_extrinsic_metadata": | ||||
converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] | emds = cast(List[RawExtrinsicMetadata], objects) | ||||
authorities = {emd.authority for emd in converted} | authorities = {emd.authority for emd in emds} | ||||
fetchers = {emd.fetcher for emd in converted} | fetchers = {emd.fetcher for emd in emds} | ||||
storage.metadata_authority_add(list(authorities)) | storage.metadata_authority_add(list(authorities)) | ||||
storage.metadata_fetcher_add(list(fetchers)) | storage.metadata_fetcher_add(list(fetchers)) | ||||
storage.raw_extrinsic_metadata_add(converted) | method(objects) | ||||
elif object_type == "revision": | |||||
# drop the metadata field from the revision (is any); this field is | |||||
# about to be dropped from the data model (in favor of | |||||
# raw_extrinsic_metadata) and there can be bogus values in the existing | |||||
# journal (metadata with \0000 in it) | |||||
method = getattr(storage, object_type + "_add") | |||||
method( | |||||
[ | |||||
object_converter_fn[object_type](dict_key_dropper(o, ("metadata",))) | |||||
for o in objects | |||||
] | |||||
) | |||||
elif object_type in ( | |||||
"directory", | |||||
"extid", | |||||
"revision", | |||||
"release", | |||||
"snapshot", | |||||
"origin", | |||||
"metadata_fetcher", | |||||
"metadata_authority", | |||||
): | |||||
method = getattr(storage, object_type + "_add") | |||||
method([object_converter_fn[object_type](o) for o in objects]) | |||||
else: | |||||
logger.warning("Received a series of %s, this should not happen", object_type) |
can you rename object_converter_fn and object_fixers to be in caps? (it's a little confusing below when using them, they look like variables)