Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163732
D2800.id10049.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D2800.id10049.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
swh.core[db,http] >= 0.0.60
swh.model >= 0.0.60
-swh.storage >= 0.0.172
+swh.storage >= 0.0.177
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -6,7 +6,7 @@
import copy
import logging
from time import time
-from typing import Callable, Dict, List, Optional
+from typing import Any, Callable, Dict, Iterable, List, Optional
from sentry_sdk import capture_exception, push_scope
try:
@@ -22,7 +22,7 @@
from swh.core.statsd import statsd
from swh.model.identifiers import normalize_timestamp
from swh.model.hashutil import hash_to_hex
-from swh.model.model import SHA1_SIZE
+from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE
from swh.objstorage.objstorage import (
ID_HASH_ALGO, ObjNotFoundError, ObjStorage,
)
@@ -237,20 +237,60 @@
return objects
+def collision_aware_content_add(
+ content_add_fn: Callable[[Iterable[Any]], None],
+ contents: List[BaseContent]) -> None:
+ """Add contents to storage. If a hash collision is detected, an error is
+ logged. Then this adds the other non colliding contents to the storage.
+
+ Args:
+ content_add_fn: Storage content callable
+ contents: List of contents or skipped contents to add to storage
+
+ """
+ if not contents:
+ return
+ colliding_content_hashes: List[Dict[str, Any]] = []
+ while True:
+ try:
+ content_add_fn(c.to_dict() for c in contents)
+ except HashCollision as e:
+ algo, hash_id, colliding_hashes = e.args
+ hash_id = hash_to_hex(hash_id)
+ colliding_content_hashes.append({
+ 'algo': algo,
+ 'hash': hash_to_hex(hash_id),
+ 'objects': [{k: hash_to_hex(v) for k, v in collision.items()}
+ for collision in colliding_hashes]
+ })
+ # Drop the colliding contents from the transaction
+ contents = [c for c in contents
+ if c.hashes() not in colliding_hashes]
+ else:
+ # Successfully added contents, we are done
+ break
+ if colliding_content_hashes:
+ for collision in colliding_content_hashes:
+ logger.error('Collision detected: %(collision)s', {
+ 'collision': collision
+ })
+
+
def _insert_objects(object_type, objects, storage):
objects = fix_objects(object_type, objects)
if object_type == 'content':
- try:
- storage.skipped_content_add(
- (obj for obj in objects if obj.get('status') == 'absent'))
- except HashCollision as e:
- logger.error('(SkippedContent) Hash collision: %s', e.args)
+ contents, skipped_contents = [], []
+ for content in objects:
+ c = BaseContent.from_dict(content)
+ if isinstance(c, SkippedContent):
+ skipped_contents.append(c)
+ else:
+ contents.append(c)
- try:
- storage.content_add_metadata(
- (obj for obj in objects if obj.get('status') != 'absent'))
- except HashCollision as e:
- logger.error('(Content) Hash collision: %s', e.args)
+ collision_aware_content_add(
+ storage.skipped_content_add, skipped_contents)
+ collision_aware_content_add(
+ storage.content_add_metadata, contents)
elif object_type in ('directory', 'revision', 'release',
'snapshot', 'origin'):
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -23,7 +23,6 @@
logger = logging.getLogger(__name__)
-
CONTENTS = [
{
'length': 3,
@@ -36,6 +35,26 @@
},
]
+duplicate_content1 = {
+ 'length': 4,
+ 'sha1': hash_to_bytes(
+ '44973274ccef6ab4dfaaf86599792fa9c3fe4689'),
+ 'sha1_git': b'another-foo',
+ 'blake2s256': b'another-bar',
+ 'sha256': b'another-baz',
+ 'status': 'visible',
+}
+
+# Craft a sha1 collision
+duplicate_content2 = duplicate_content1.copy()
+sha1_array = bytearray(duplicate_content1['sha1_git'])
+sha1_array[0] += 1
+duplicate_content2['sha1_git'] = bytes(sha1_array)
+
+
+DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
+
+
COMMITTERS = [
{
'fullname': b'foo',
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -1,13 +1,14 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
+import logging
import random
from subprocess import Popen
-from typing import Tuple
+from typing import Dict, Tuple
import dateutil
from confluent_kafka import Producer
@@ -19,8 +20,10 @@
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import Content
-from .conftest import OBJECT_TYPE_KEYS
+from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS
from .utils import MockedJournalClient, MockedKafkaWriter
@@ -33,10 +36,22 @@
}
+def make_topic(kafka_prefix: str, object_type: str) -> str:
+ return kafka_prefix + '.' + object_type
+
+
def test_storage_play(
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int]):
+ kafka_server: Tuple[Popen, int],
+ caplog):
+ """Optimal replayer scenario.
+
+ This:
+ - writes objects to the topic
+ - replayer consumes objects from the topic and replay them
+
+ """
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
@@ -54,7 +69,112 @@
nb_sent = 0
nb_visits = 0
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
- topic = kafka_prefix + '.' + object_type
+ topic = make_topic(kafka_prefix, object_type)
+ for object_ in objects:
+ key = bytes(random.randint(0, 255) for _ in range(40))
+ object_ = object_.copy()
+ if object_type == 'content':
+ object_['ctime'] = now
+ elif object_type == 'origin_visit':
+ nb_visits += 1
+ object_['visit'] = nb_visits
+ producer.produce(
+ topic=topic, key=key_to_kafka(key),
+ value=value_to_kafka(object_),
+ )
+ nb_sent += 1
+
+ producer.flush()
+
+ caplog.set_level(logging.ERROR, 'swh.journal.replay')
+ # Fill the storage from Kafka
+ replayer = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ )
+ worker_fn = functools.partial(process_replay_objects, storage=storage)
+ nb_inserted = 0
+ while nb_inserted < nb_sent:
+ nb_inserted += replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+
+ # Check the objects were actually inserted in the storage
+ assert OBJECT_TYPE_KEYS['revision'][1] == \
+ list(storage.revision_get(
+ [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]]))
+ assert OBJECT_TYPE_KEYS['release'][1] == \
+ list(storage.release_get(
+ [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]]))
+
+ origins = list(storage.origin_get(
+ [orig for orig in OBJECT_TYPE_KEYS['origin'][1]]))
+ assert OBJECT_TYPE_KEYS['origin'][1] == \
+ [{'url': orig['url']} for orig in origins]
+ for origin in origins:
+ origin_url = origin['url']
+ expected_visits = [
+ {
+ **visit,
+ 'origin': origin_url,
+ 'date': dateutil.parser.parse(visit['date']),
+ }
+ for visit in OBJECT_TYPE_KEYS['origin_visit'][1]
+ if visit['origin'] == origin['url']
+ ]
+ actual_visits = list(storage.origin_visit_get(
+ origin_url))
+ for visit in actual_visits:
+ del visit['visit'] # opaque identifier
+ assert expected_visits == actual_visits
+
+ input_contents = OBJECT_TYPE_KEYS['content'][1]
+ contents = storage.content_get_metadata(
+ [cont['sha1'] for cont in input_contents])
+ assert len(contents) == len(input_contents)
+ assert contents == {cont['sha1']: [cont] for cont in input_contents}
+
+ collision = 0
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if 'Colliding contents:' in logtext:
+ collision += 1
+
+ assert collision == 0, "No collision should be detected"
+
+
+def test_storage_play_with_collision(
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+ caplog):
+ """Another replayer scenario with collisions.
+
+ This:
+ - writes objects to the topic, including colliding contents
+ - replayer consumes objects from the topic and replay them
+ - This drops the colliding contents from the replay when detected
+
+ """
+ (_, port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
+
+ storage = get_storage(**storage_config)
+
+ producer = Producer({
+ 'bootstrap.servers': 'localhost:{}'.format(port),
+ 'client.id': 'test producer',
+ 'enable.idempotence': 'true',
+ })
+
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+
+ # Fill Kafka
+ nb_sent = 0
+ nb_visits = 0
+ for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
+ topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
@@ -69,8 +189,20 @@
)
nb_sent += 1
+ # Create collision in input data
+ # They are not written in the destination
+ for content in DUPLICATE_CONTENTS:
+ topic = make_topic(kafka_prefix, 'content')
+ producer.produce(
+ topic=topic, key=key_to_kafka(key),
+ value=value_to_kafka(content),
+ )
+
+ nb_sent += 1
+
producer.flush()
+ caplog.set_level(logging.ERROR, 'swh.journal.replay')
# Fill the storage from Kafka
replayer = JournalClient(
brokers='localhost:%d' % kafka_server[1],
@@ -119,6 +251,31 @@
assert len(contents) == len(input_contents)
assert contents == {cont['sha1']: [cont] for cont in input_contents}
+ nb_collisions = 0
+
+ actual_collision: Dict
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if 'Collision detected:' in logtext:
+ nb_collisions += 1
+ actual_collision = record.args['collision']
+
+ assert nb_collisions == 1, "1 collision should be detected"
+
+ algo = 'sha1'
+ assert actual_collision['algo'] == algo
+ expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
+ assert actual_collision['hash'] == expected_colliding_hash
+
+ actual_colliding_hashes = actual_collision['objects']
+ assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
+ for content in DUPLICATE_CONTENTS:
+ expected_content_hashes = {
+ k: hash_to_hex(v)
+ for k, v in Content.from_dict(content).hashes().items()
+ }
+ assert expected_content_hashes in actual_colliding_hashes
+
def _test_write_replay_origin_visit(visits):
"""Helper function to write tests for origin_visit.
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -7,6 +7,7 @@
testing
deps =
pytest-cov
+ dev: pdbpp
setenv =
SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka}
commands =
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 2:10 PM (8 h, 10 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228970
Attached To
D2800: replayer: Filter out colliding contents when replaying
Event Timeline
Log In to Comment