Page MenuHomeSoftware Heritage

D2800.id10049.diff
No OneTemporary

D2800.id10049.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
swh.core[db,http] >= 0.0.60
swh.model >= 0.0.60
-swh.storage >= 0.0.172
+swh.storage >= 0.0.177
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -6,7 +6,7 @@
import copy
import logging
from time import time
-from typing import Callable, Dict, List, Optional
+from typing import Any, Callable, Dict, Iterable, List, Optional
from sentry_sdk import capture_exception, push_scope
try:
@@ -22,7 +22,7 @@
from swh.core.statsd import statsd
from swh.model.identifiers import normalize_timestamp
from swh.model.hashutil import hash_to_hex
-from swh.model.model import SHA1_SIZE
+from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE
from swh.objstorage.objstorage import (
ID_HASH_ALGO, ObjNotFoundError, ObjStorage,
)
@@ -237,20 +237,60 @@
return objects
+def collision_aware_content_add(
+ content_add_fn: Callable[[Iterable[Any]], None],
+ contents: List[BaseContent]) -> None:
+ """Add contents to storage. If a hash collision is detected, an error is
+ logged. Then this adds the other non colliding contents to the storage.
+
+ Args:
+ content_add_fn: Storage content callable
+ contents: List of contents or skipped contents to add to storage
+
+ """
+ if not contents:
+ return
+ colliding_content_hashes: List[Dict[str, Any]] = []
+ while True:
+ try:
+ content_add_fn(c.to_dict() for c in contents)
+ except HashCollision as e:
+ algo, hash_id, colliding_hashes = e.args
+ hash_id = hash_to_hex(hash_id)
+ colliding_content_hashes.append({
+ 'algo': algo,
+ 'hash': hash_to_hex(hash_id),
+ 'objects': [{k: hash_to_hex(v) for k, v in collision.items()}
+ for collision in colliding_hashes]
+ })
+ # Drop the colliding contents from the transaction
+ contents = [c for c in contents
+ if c.hashes() not in colliding_hashes]
+ else:
+ # Successfully added contents, we are done
+ break
+ if colliding_content_hashes:
+ for collision in colliding_content_hashes:
+ logger.error('Collision detected: %(collision)s', {
+ 'collision': collision
+ })
+
+
def _insert_objects(object_type, objects, storage):
objects = fix_objects(object_type, objects)
if object_type == 'content':
- try:
- storage.skipped_content_add(
- (obj for obj in objects if obj.get('status') == 'absent'))
- except HashCollision as e:
- logger.error('(SkippedContent) Hash collision: %s', e.args)
+ contents, skipped_contents = [], []
+ for content in objects:
+ c = BaseContent.from_dict(content)
+ if isinstance(c, SkippedContent):
+ skipped_contents.append(c)
+ else:
+ contents.append(c)
- try:
- storage.content_add_metadata(
- (obj for obj in objects if obj.get('status') != 'absent'))
- except HashCollision as e:
- logger.error('(Content) Hash collision: %s', e.args)
+ collision_aware_content_add(
+ storage.skipped_content_add, skipped_contents)
+ collision_aware_content_add(
+ storage.content_add_metadata, contents)
elif object_type in ('directory', 'revision', 'release',
'snapshot', 'origin'):
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -23,7 +23,6 @@
logger = logging.getLogger(__name__)
-
CONTENTS = [
{
'length': 3,
@@ -36,6 +35,26 @@
},
]
+duplicate_content1 = {
+ 'length': 4,
+ 'sha1': hash_to_bytes(
+ '44973274ccef6ab4dfaaf86599792fa9c3fe4689'),
+ 'sha1_git': b'another-foo',
+ 'blake2s256': b'another-bar',
+ 'sha256': b'another-baz',
+ 'status': 'visible',
+}
+
+# Craft a sha1 collision
+duplicate_content2 = duplicate_content1.copy()
+sha1_array = bytearray(duplicate_content1['sha1_git'])
+sha1_array[0] += 1
+duplicate_content2['sha1_git'] = bytes(sha1_array)
+
+
+DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
+
+
COMMITTERS = [
{
'fullname': b'foo',
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -1,13 +1,14 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
+import logging
import random
from subprocess import Popen
-from typing import Tuple
+from typing import Dict, Tuple
import dateutil
from confluent_kafka import Producer
@@ -19,8 +20,10 @@
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import Content
-from .conftest import OBJECT_TYPE_KEYS
+from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS
from .utils import MockedJournalClient, MockedKafkaWriter
@@ -33,10 +36,22 @@
}
+def make_topic(kafka_prefix: str, object_type: str) -> str:
+ return kafka_prefix + '.' + object_type
+
+
def test_storage_play(
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int]):
+ kafka_server: Tuple[Popen, int],
+ caplog):
+ """Optimal replayer scenario.
+
+ This:
+ - writes objects to the topic
+ - replayer consumes objects from the topic and replay them
+
+ """
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
@@ -54,7 +69,112 @@
nb_sent = 0
nb_visits = 0
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
- topic = kafka_prefix + '.' + object_type
+ topic = make_topic(kafka_prefix, object_type)
+ for object_ in objects:
+ key = bytes(random.randint(0, 255) for _ in range(40))
+ object_ = object_.copy()
+ if object_type == 'content':
+ object_['ctime'] = now
+ elif object_type == 'origin_visit':
+ nb_visits += 1
+ object_['visit'] = nb_visits
+ producer.produce(
+ topic=topic, key=key_to_kafka(key),
+ value=value_to_kafka(object_),
+ )
+ nb_sent += 1
+
+ producer.flush()
+
+ caplog.set_level(logging.ERROR, 'swh.journal.replay')
+ # Fill the storage from Kafka
+ replayer = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ )
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_inserted = 0
+ while nb_inserted < nb_sent:
+ nb_inserted += replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+
+ # Check the objects were actually inserted in the storage
+ assert OBJECT_TYPE_KEYS['revision'][1] == \
+ list(storage.revision_get(
+ [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]]))
+ assert OBJECT_TYPE_KEYS['release'][1] == \
+ list(storage.release_get(
+ [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]]))
+
+ origins = list(storage.origin_get(
+ [orig for orig in OBJECT_TYPE_KEYS['origin'][1]]))
+ assert OBJECT_TYPE_KEYS['origin'][1] == \
+ [{'url': orig['url']} for orig in origins]
+ for origin in origins:
+ origin_url = origin['url']
+ expected_visits = [
+ {
+ **visit,
+ 'origin': origin_url,
+ 'date': dateutil.parser.parse(visit['date']),
+ }
+ for visit in OBJECT_TYPE_KEYS['origin_visit'][1]
+ if visit['origin'] == origin['url']
+ ]
+ actual_visits = list(storage.origin_visit_get(
+ origin_url))
+ for visit in actual_visits:
+ del visit['visit'] # opaque identifier
+ assert expected_visits == actual_visits
+
+ input_contents = OBJECT_TYPE_KEYS['content'][1]
+ contents = storage.content_get_metadata(
+ [cont['sha1'] for cont in input_contents])
+ assert len(contents) == len(input_contents)
+ assert contents == {cont['sha1']: [cont] for cont in input_contents}
+
+ collision = 0
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if 'Colliding contents:' in logtext:
+ collision += 1
+
+ assert collision == 0, "No collision should be detected"
+
+
+def test_storage_play_with_collision(
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog):
+ """Another replayer scenario with collisions.
+
+ This:
+ - writes objects to the topic, including colliding contents
+ - replayer consumes objects from the topic and replay them
+ - This drops the colliding contents from the replay when detected
+
+ """
+ (_, port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
+
+ storage = get_storage(**storage_config)
+
+ producer = Producer({
+ 'bootstrap.servers': 'localhost:{}'.format(port),
+ 'client.id': 'test producer',
+ 'enable.idempotence': 'true',
+ })
+
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ # Fill Kafka
+ nb_sent = 0
+ nb_visits = 0
+ for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
+ topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
@@ -69,8 +189,20 @@
)
nb_sent += 1
+ # Create collision in input data
+ # They are not written in the destination
+ for content in DUPLICATE_CONTENTS:
+ topic = make_topic(kafka_prefix, 'content')
+ producer.produce(
+ topic=topic, key=key_to_kafka(key),
+ value=value_to_kafka(content),
+ )
+
+ nb_sent += 1
+
producer.flush()
+ caplog.set_level(logging.ERROR, 'swh.journal.replay')
# Fill the storage from Kafka
replayer = JournalClient(
brokers='localhost:%d' % kafka_server[1],
@@ -119,6 +251,31 @@
assert len(contents) == len(input_contents)
assert contents == {cont['sha1']: [cont] for cont in input_contents}
+ nb_collisions = 0
+
+ actual_collision: Dict
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if 'Collision detected:' in logtext:
+ nb_collisions += 1
+ actual_collision = record.args['collision']
+
+ assert nb_collisions == 1, "1 collision should be detected"
+
+ algo = 'sha1'
+ assert actual_collision['algo'] == algo
+ expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
+ assert actual_collision['hash'] == expected_colliding_hash
+
+ actual_colliding_hashes = actual_collision['objects']
+ assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
+ for content in DUPLICATE_CONTENTS:
+ expected_content_hashes = {
+ k: hash_to_hex(v)
+ for k, v in Content.from_dict(content).hashes().items()
+ }
+ assert expected_content_hashes in actual_colliding_hashes
+
def _test_write_replay_origin_visit(visits):
"""Helper function to write tests for origin_visit.
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -7,6 +7,7 @@
testing
deps =
pytest-cov
+ dev: pdbpp
setenv =
SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka}
commands =

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 2:10 PM (8 h, 10 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228970

Event Timeline