Changeset View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import logging | import logging | ||||
from time import time | from time import time | ||||
from typing import Callable, Dict, List, Optional | from typing import Any, Callable, Dict, Iterable, List, Optional | ||||
from sentry_sdk import capture_exception, push_scope | from sentry_sdk import capture_exception, push_scope | ||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, retry_if_exception_type, stop_after_attempt, | retry, retry_if_exception_type, stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE | ||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ID_HASH_ALGO, ObjNotFoundError, ObjStorage, | ||||
) | ) | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
▲ Show 20 Lines • Show All 197 Lines • ▼ Show 20 Lines | def fix_objects(object_type, objects): | ||||
""" # noqa | """ # noqa | ||||
if object_type == 'revision': | if object_type == 'revision': | ||||
objects = _fix_revisions(objects) | objects = _fix_revisions(objects) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
objects = _fix_origin_visits(objects) | objects = _fix_origin_visits(objects) | ||||
return objects | return objects | ||||
olasd: Not a fan of that name; `collision_aware_content_add`? | |||||
Done Inline Actionsok ardumont: ok | |||||
def collision_aware_content_add( | |||||
content_add_fn: Callable[[Iterable[Any]], None], | |||||
contents: List[BaseContent]) -> None: | |||||
"""Add contents to storage. If a hash collision is detected, an error is | |||||
logged. Then this adds the other non colliding contents to the storage. | |||||
Args: | |||||
content_add_fn: Storage content callable | |||||
contents: List of contents or skipped contents to add to storage | |||||
""" | |||||
if not contents: | |||||
return | |||||
colliding_content_hashes: List[Dict[str, Any]] = [] | |||||
while True: | |||||
try: | |||||
content_add_fn(c.to_dict() for c in contents) | |||||
except HashCollision as e: | |||||
algo, hash_id, colliding_hashes = e.args | |||||
Not Done Inline ActionsI think this should really be a while loop, starting with an empty set of colliding_contents, a full list of contents, then winnowing that down until:
The current implementation breaks if you have two pairs of colliding contents in the same batch, which doesn't sound out of question as people might have added several sets of colliding pdfs to a given repo. You can also avoid the conversion function argument, by converting objects to BaseContents outside of that call, then unconditionally calling to_dict when calling content_add_fn inside this function. olasd: I think this should really be a while loop, starting with an empty set of `colliding_contents`… | |||||
Done Inline Actions
ok, sounds better indeed.
Indeed. I missed that. meh, i prefer your proposal ;)
I tried and it does not work. ardumont: > I think this should really be a while loop, starting with an empty set of colliding_contents… | |||||
Not Done Inline Actions
I'm not sure why the following wouldn't work:
At some point, when the validate filter gets dropped, we can just drop the .as_dict() call. And the conversion function shouldn't be needed anymore. olasd: > I tried and it does not work.
> I did not check further than that but i think it's the… | |||||
Done Inline Actions
well, yes, i did not add that extra conversion indeed... But ok, let's do that, it's not happening that often and it's "temporary". ardumont: > In the collision_aware_content_add function, when calling out to storage, pass the objects… | |||||
Done Inline Actions
Ok, i meant i tried to do the conversion outside the loop. Doing exactly what you say should work but i was wary of that many back and forth in the first place... ardumont: > You can also avoid the conversion function argument, by converting objects to BaseContents… | |||||
hash_id = hash_to_hex(hash_id) | |||||
Not Done Inline ActionsSo, now that I've given it a thorough think, I'm worried about the "cardinality" of this data: When logging in production, we "flatten" the logging.foo() arguments, as journald only supports flat key-value pairs. The schema you've implemented for this data means that, for each collision, we'll pass the following k/v pairs to the logging framework: swh_hashes_{algo}-{colliding_id}_0_sha1=<sha1 of content 0> swh_hashes_{algo}-{colliding_id}_0_sha1_git=<sha1_git of content 0> swh_hashes_{algo}-{colliding_id}_0_sha256=<sha256 of content 0> swh_hashes_{algo}-{colliding_id}_0_blake2s256=<blake2s256 of content 0> swh_hashes_{algo}-{colliding_id}_1_sha1=<sha1 of content 1> swh_hashes_{algo}-{colliding_id}_1_sha1_git=<sha1_git of content 1> swh_hashes_{algo}-{colliding_id}_1_sha256=<sha256 of content 1> swh_hashes_{algo}-{colliding_id}_1_blake2s256=<blake2s256 of content 1> We're going create 8 new fields per collision in our logging infra, which is not great (for instance, elasticsearch doesn't let any given index have more than 1000 fields). It also makes queries harder (because often, the names of the fields are fixed in queries). I think we should make the algorithm and the colliding hash values rather than a part of the key. All in all, I suggest turning colliding_content_hashes into a collisions list, with values following the schema: { 'algorithm': algo, 'hash': hash_id, 'objects': colliding_hashes, } and logging a different error for each item in the collision list; Each error will generate the following key/values: swh_collision_algorithm=<algorithm> swh_collision_hash=<colliding hash> swh_collision_objects_0_sha1=<sha1 of content 0> swh_collision_objects_0_sha1_git=<sha1_git of content 0> swh_collision_objects_0_sha256=<sha256 of content 0> swh_collision_objects_0_blake2s256=<blake2s256 of content 0> swh_collision_objects_1_sha1=<sha1 of content 1> swh_collision_objects_1_sha1_git=<sha1_git of content 1> swh_collision_objects_1_sha256=<sha256 of content 1> swh_collision_objects_1_blake2s256=<blake2s256 of content 1> [... if we're unlucky, more objects go here ...] This makes the set of logging keys constant, which avoids making our logging framework blow up, and allows us to do meaningful queries on the data. We also need to encode all values as strings, as I don't think binary data is passed quite properly through the full logging stack... olasd: So, now that I've given it a thorough think, I'm worried about the "cardinality" of this data… | |||||
colliding_content_hashes.append({ | |||||
'algo': algo, | |||||
'hash': hash_to_hex(hash_id), | |||||
'objects': [{k: hash_to_hex(v) for k, v in collision.items()} | |||||
for collision in colliding_hashes] | |||||
}) | |||||
# Drop the colliding contents from the transaction | |||||
contents = [c for c in contents | |||||
if c.hashes() not in colliding_hashes] | |||||
else: | |||||
Not Done Inline ActionsFollowing the previous comment, this should turn in a for loop. You should rename the key collision instead of hashes which would give the generated k/v pairs more meaning. olasd: Following the previous comment, this should turn in a for loop. You should rename the key… | |||||
# Successfully added contents, we are done | |||||
break | |||||
if colliding_content_hashes: | |||||
for collision in colliding_content_hashes: | |||||
logger.error('Collision detected: %(collision)s', { | |||||
'collision': collision | |||||
}) | |||||
def _insert_objects(object_type, objects, storage): | def _insert_objects(object_type, objects, storage): | ||||
objects = fix_objects(object_type, objects) | objects = fix_objects(object_type, objects) | ||||
if object_type == 'content': | if object_type == 'content': | ||||
try: | contents, skipped_contents = [], [] | ||||
storage.skipped_content_add( | for content in objects: | ||||
(obj for obj in objects if obj.get('status') == 'absent')) | c = BaseContent.from_dict(content) | ||||
except HashCollision as e: | if isinstance(c, SkippedContent): | ||||
logger.error('(SkippedContent) Hash collision: %s', e.args) | skipped_contents.append(c) | ||||
else: | |||||
contents.append(c) | |||||
try: | collision_aware_content_add( | ||||
storage.content_add_metadata( | storage.skipped_content_add, skipped_contents) | ||||
(obj for obj in objects if obj.get('status') != 'absent')) | collision_aware_content_add( | ||||
except HashCollision as e: | storage.content_add_metadata, contents) | ||||
logger.error('(Content) Hash collision: %s', e.args) | |||||
elif object_type in ('directory', 'revision', 'release', | elif object_type in ('directory', 'revision', 'release', | ||||
Not Done Inline ActionsI think we should collapse the two for loops in a single one building two lists, which allows us to:
olasd: I think we should collapse the two for loops in a single one building two lists, which allows… | |||||
'snapshot', 'origin'): | 'snapshot', 'origin'): | ||||
# TODO: split batches that are too large for the storage | # TODO: split batches that are too large for the storage | ||||
# to handle? | # to handle? | ||||
method = getattr(storage, object_type + '_add') | method = getattr(storage, object_type + '_add') | ||||
method(objects) | method(objects) | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
for visit in objects: | for visit in objects: | ||||
storage.origin_add_one({'url': visit['origin']}) | storage.origin_add_one({'url': visit['origin']}) | ||||
▲ Show 20 Lines • Show All 256 Lines • Show Last 20 Lines |
Not a fan of that name; collision_aware_content_add?