Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346206
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
35 KB
Subscribers
None
View Options
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
index 4112ea0..6ab40e6 100644
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -1,640 +1,640 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
import copy
import functools
import logging
import re
import tempfile
from subprocess import Popen
from typing import Any, Dict, Tuple
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Producer
import pytest
import yaml
from swh.model.hashutil import hash_to_hex
from swh.objstorage.backends.in_memory import InMemoryObjStorage
from swh.storage import get_storage
from swh.journal.cli import cli
from swh.journal.replay import CONTENT_REPLAY_RETRIES
from swh.journal.serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
CLI_CONFIG = {
'storage': {
'cls': 'memory',
},
'objstorage_src': {
'cls': 'mocked',
'name': 'src',
},
'objstorage_dst': {
'cls': 'mocked',
'name': 'dst',
},
}
@pytest.fixture
def storage():
"""An swh-storage object that gets injected into the CLI functions."""
storage_config = {
'cls': 'pipeline',
'steps': [
{'cls': 'validate'},
{'cls': 'memory'},
]
}
storage = get_storage(**storage_config)
with patch('swh.journal.cli.get_storage') as get_storage_mock:
get_storage_mock.return_value = storage
yield storage
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
from swh.journal.replay import copy_object, obj_in_objstorage
monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None)
def invoke(*args, env=None, journal_config=None):
config = copy.deepcopy(CLI_CONFIG)
if journal_config:
config['journal'] = journal_config
runner = CliRunner()
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ['-C' + config_fd.name] + list(args)
return runner.invoke(
cli, args, obj={'log_level': logging.DEBUG}, env=env,
)
def test_replay(
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int]):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
producer = Producer({
'bootstrap.servers': 'localhost:{}'.format(kafka_port),
'client.id': 'test-producer',
- 'enable.idempotence': 'true',
+ 'acks': 'all',
})
snapshot = {'id': b'foo', 'branches': {
b'HEAD': {
'target_type': 'revision',
'target': b'\x01'*20,
}
}} # type: Dict[str, Any]
producer.produce(
topic=kafka_prefix+'.snapshot',
key=key_to_kafka(snapshot['id']),
value=value_to_kafka(snapshot),
)
producer.flush()
logger.debug('Flushed producer')
result = invoke(
'replay',
'--stop-after-objects', '1',
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
assert storage.snapshot_get(snapshot['id']) == {
**snapshot, 'next_branch': None}
def _patch_objstorages(names):
objstorages = {name: InMemoryObjStorage() for name in names}
def get_mock_objstorage(cls, **args):
assert cls == 'mocked', cls
return objstorages[args['name']]
def decorator(f):
@functools.wraps(f)
@patch('swh.journal.cli.get_objstorage')
def newf(get_objstorage_mock, *args, **kwargs):
get_objstorage_mock.side_effect = get_mock_objstorage
f(*args, objstorages=objstorages, **kwargs)
return newf
return decorator
NUM_CONTENTS = 10
def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
producer = Producer({
'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port),
'client.id': 'test-producer',
- 'enable.idempotence': 'true',
+ 'acks': 'all',
})
contents = {}
for i in range(NUM_CONTENTS):
content = b'\x00'*19 + bytes([i])
sha1 = objstorages['src'].add(content)
contents[sha1] = content
producer.produce(
topic=kafka_prefix+'.content',
key=key_to_kafka(sha1),
value=key_to_kafka({
'sha1': sha1,
'status': 'visible',
}),
)
producer.flush()
return contents
@_patch_objstorages(['src', 'dst'])
def test_replay_content(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int]):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
@_patch_objstorages(['src', 'dst'])
def test_replay_content_structured_log(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
caplog.set_level(logging.DEBUG, 'swh.journal.replay')
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents)
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = set()
for record in caplog.records:
logtext = record.getMessage()
if 'copied' in logtext:
copied.add(record.args['obj_id'])
assert copied == expected_obj_ids, (
"Mismatched logging; see captured log output for details."
)
@_patch_objstorages(['src', 'dst'])
def test_replay_content_static_group_id(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
# Setup log capture to fish the consumer settings out of the log messages
caplog.set_level(logging.DEBUG, 'swh.journal.client')
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
env={'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'},
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
consumer_settings = None
for record in caplog.records:
if 'Consumer settings' in record.message:
consumer_settings = record.args
break
assert consumer_settings is not None, (
'Failed to get consumer settings out of the consumer log. '
'See log capture for details.'
)
assert consumer_settings['group.instance.id'] == 'static-group-instance-id'
assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000
assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000
for (sha1, content) in contents.items():
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
@_patch_objstorages(['src', 'dst'])
def test_replay_content_exclude(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int]):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
excluded_contents = list(contents)[0::2] # picking half of them
with tempfile.NamedTemporaryFile(mode='w+b') as fd:
fd.write(b''.join(sorted(excluded_contents)))
fd.seek(0)
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
'--exclude-sha1-file', fd.name,
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
if sha1 in excluded_contents:
assert sha1 not in objstorages['dst'], sha1
else:
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
NUM_CONTENTS_DST = 5
@_patch_objstorages(['src', 'dst'])
@pytest.mark.parametrize("check_dst,expected_copied,expected_in_dst", [
(True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST),
(False, NUM_CONTENTS, 0),
])
def test_replay_content_check_dst(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
check_dst: bool,
expected_copied: int,
expected_in_dst: int,
caplog):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages['dst'].add(content, obj_id=sha1)
caplog.set_level(logging.DEBUG, 'swh.journal.replay')
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
'--check-dst' if check_dst else '--no-check-dst',
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
in_dst = 0
for record in caplog.records:
logtext = record.getMessage()
if 'copied' in logtext:
copied += 1
elif 'in dst' in logtext:
in_dst += 1
assert (copied == expected_copied and in_dst == expected_in_dst), (
"Unexpected amount of objects copied, see the captured log for details"
)
for (sha1, content) in contents.items():
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
class FlakyObjStorage(InMemoryObjStorage):
def __init__(self, *args, **kwargs):
state = kwargs.pop('state')
self.failures_left = Counter(kwargs.pop('failures'))
super().__init__(*args, **kwargs)
if state:
self.state = state
def flaky_operation(self, op, obj_id):
if self.failures_left[op, obj_id] > 0:
self.failures_left[op, obj_id] -= 1
raise RuntimeError(
'Failed %s on %s' % (op, hash_to_hex(obj_id))
)
def get(self, obj_id):
self.flaky_operation('get', obj_id)
return super().get(obj_id)
def add(self, data, obj_id=None, check_presence=True):
self.flaky_operation('add', obj_id)
return super().add(data, obj_id=obj_id,
check_presence=check_presence)
def __contains__(self, obj_id):
self.flaky_operation('in', obj_id)
return super().__contains__(obj_id)
@_patch_objstorages(['src', 'dst'])
def test_replay_content_check_dst_retry(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
monkeypatch_retry_sleep):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
failures = {}
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages['dst'].add(content, obj_id=sha1)
failures['in', sha1] = 1
orig_dst = objstorages['dst']
objstorages['dst'] = FlakyObjStorage(state=orig_dst.state,
failures=failures)
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
'--check-dst',
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
@_patch_objstorages(['src', 'dst'])
def test_replay_content_failed_copy_retry(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
monkeypatch_retry_sleep):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
add_failures = {}
get_failures = {}
definitely_failed = set()
# We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times.
# We generate failures for 2 different operations, get and add.
num_retry_contents = 2 * CONTENT_REPLAY_RETRIES
assert num_retry_contents < NUM_CONTENTS, (
"Need to generate more test contents to properly test retry behavior"
)
for i, sha1 in enumerate(contents):
if i >= num_retry_contents:
break
# This generates a number of failures, up to CONTENT_REPLAY_RETRIES
num_failures = (i % CONTENT_REPLAY_RETRIES) + 1
# This generates failures of add for the first CONTENT_REPLAY_RETRIES
# objects, then failures of get.
if i < CONTENT_REPLAY_RETRIES:
add_failures['add', sha1] = num_failures
else:
get_failures['get', sha1] = num_failures
# Only contents that have CONTENT_REPLAY_RETRIES or more are
# definitely failing
if num_failures >= CONTENT_REPLAY_RETRIES:
definitely_failed.add(hash_to_hex(sha1))
objstorages['dst'] = FlakyObjStorage(
state=objstorages['dst'].state,
failures=add_failures,
)
objstorages['src'] = FlakyObjStorage(
state=objstorages['src'].state,
failures=get_failures,
)
caplog.set_level(logging.DEBUG, 'swh.journal.replay')
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
actually_failed = set()
for record in caplog.records:
logtext = record.getMessage()
if 'copied' in logtext:
copied += 1
elif 'Failed operation' in logtext:
assert record.levelno == logging.ERROR
assert record.args['retries'] == CONTENT_REPLAY_RETRIES
actually_failed.add(record.args['obj_id'])
assert actually_failed == definitely_failed, (
'Unexpected object copy failures; see captured log for details'
)
for (sha1, content) in contents.items():
if hash_to_hex(sha1) in definitely_failed:
assert sha1 not in objstorages['dst']
continue
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
@_patch_objstorages(['src', 'dst'])
def test_replay_content_objnotfound(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog):
(_, kafka_port) = kafka_server
kafka_prefix += '.swh.journal.objects'
contents = _fill_objstorage_and_kafka(
kafka_port, kafka_prefix, objstorages)
num_contents_deleted = 5
contents_deleted = set()
for i, sha1 in enumerate(contents):
if i >= num_contents_deleted:
break
del objstorages['src'].state[sha1]
contents_deleted.add(hash_to_hex(sha1))
caplog.set_level(logging.DEBUG, 'swh.journal.replay')
result = invoke(
'content-replay',
'--stop-after-objects', str(NUM_CONTENTS),
journal_config={
'brokers': ['127.0.0.1:%d' % kafka_port],
'group_id': kafka_consumer_group,
'prefix': kafka_prefix,
},
)
expected = r'Done.\n'
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
not_in_src = set()
for record in caplog.records:
logtext = record.getMessage()
if 'copied' in logtext:
copied += 1
elif 'object not found' in logtext:
# Check that the object id can be recovered from logs
assert record.levelno == logging.ERROR
not_in_src.add(record.args['obj_id'])
assert copied == NUM_CONTENTS - num_contents_deleted, (
"Unexpected number of contents copied"
)
assert not_in_src == contents_deleted, (
"Mismatch between deleted contents and not_in_src logs"
)
for (sha1, content) in contents.items():
if sha1 not in objstorages['src']:
continue
assert sha1 in objstorages['dst'], sha1
assert objstorages['dst'].get(sha1) == content
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
index 954d36d..122f304 100644
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -1,138 +1,138 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from subprocess import Popen
from typing import Dict, List, Tuple
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.model.hypothesis_strategies import revisions
from swh.model.model import Content
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
def test_client(
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int]):
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
producer = Producer({
'bootstrap.servers': 'localhost:{}'.format(port),
'client.id': 'test producer',
- 'enable.idempotence': 'true',
+ 'acks': 'all',
})
rev = revisions().example()
# Fill Kafka
producer.produce(
topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id),
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
brokers='localhost:%d' % kafka_server[1],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({'revision': [rev.to_dict()]})
def test_client_eof(
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int]):
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
producer = Producer({
'bootstrap.servers': 'localhost:{}'.format(port),
'client.id': 'test producer',
- 'enable.idempotence': 'true',
+ 'acks': 'all',
})
rev = revisions().example()
# Fill Kafka
producer.produce(
topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id),
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
brokers='localhost:%d' % kafka_server[1],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=None,
stop_on_eof=True,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({'revision': [rev.to_dict()]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
batch_size: int,
):
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
producer = Producer({
'bootstrap.servers': 'localhost:{}'.format(port),
'client.id': 'test producer',
- 'enable.idempotence': 'true',
+ 'acks': 'all',
})
contents = [Content.from_data(bytes([i])) for i in range(num_objects)]
# Fill Kafka
for content in contents:
producer.produce(
topic=kafka_prefix + '.content',
key=key_to_kafka(content.sha1),
value=value_to_kafka(content.to_dict()),
)
producer.flush()
client = JournalClient(
brokers=['localhost:%d' % kafka_server[1]],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=num_objects,
batch_size=batch_size,
)
collected_output: List[Dict] = []
def worker_fn(objects):
received = objects['content']
assert len(received) <= batch_size
collected_output.extend(received)
client.process(worker_fn)
assert collected_output == [content.to_dict() for content in contents]
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
index cb74503..6dfb2db 100644
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -1,244 +1,244 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
import random
from subprocess import Popen
from typing import Tuple
import dateutil
from confluent_kafka import Producer
from hypothesis import strategies, given, settings
import pytest
from swh.storage import get_storage
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray
from .conftest import OBJECT_TYPE_KEYS
from .utils import MockedJournalClient, MockedKafkaWriter
storage_config = {
'cls': 'pipeline',
'steps': [
{'cls': 'validate'},
{'cls': 'memory'},
]
}
def test_storage_play(
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int]):
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
storage = get_storage(**storage_config)
producer = Producer({
'bootstrap.servers': 'localhost:{}'.format(port),
'client.id': 'test producer',
- 'enable.idempotence': 'true',
+ 'acks': 'all',
})
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
topic = kafka_prefix + '.' + object_type
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == 'content':
object_['ctime'] = now
elif object_type == 'origin_visit':
nb_visits += 1
object_['visit'] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key),
value=value_to_kafka(object_),
)
nb_sent += 1
producer.flush()
# Fill the storage from Kafka
replayer = JournalClient(
brokers='localhost:%d' % kafka_server[1],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
nb_inserted += replayer.process(worker_fn)
assert nb_sent == nb_inserted
# Check the objects were actually inserted in the storage
assert OBJECT_TYPE_KEYS['revision'][1] == \
list(storage.revision_get(
[rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]]))
assert OBJECT_TYPE_KEYS['release'][1] == \
list(storage.release_get(
[rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]]))
origins = list(storage.origin_get(
[orig for orig in OBJECT_TYPE_KEYS['origin'][1]]))
assert OBJECT_TYPE_KEYS['origin'][1] == \
[{'url': orig['url']} for orig in origins]
for origin in origins:
origin_url = origin['url']
expected_visits = [
{
**visit,
'origin': origin_url,
'date': dateutil.parser.parse(visit['date']),
}
for visit in OBJECT_TYPE_KEYS['origin_visit'][1]
if visit['origin'] == origin['url']
]
actual_visits = list(storage.origin_visit_get(
origin_url))
for visit in actual_visits:
del visit['visit'] # opaque identifier
assert expected_visits == actual_visits
input_contents = OBJECT_TYPE_KEYS['content'][1]
contents = storage.content_get_metadata(
[cont['sha1'] for cont in input_contents])
assert len(contents) == len(input_contents)
assert contents == {cont['sha1']: [cont] for cont in input_contents}
def _test_write_replay_origin_visit(visits):
"""Helper function to write tests for origin_visit.
Each visit (a dict) given in the 'visits' argument will be sent to
a (mocked) kafka queue, which a in-memory-storage backed replayer is
listening to.
Check that corresponding origin visits entities are present in the storage
and have correct values.
"""
queue = []
replayer = MockedJournalClient(queue)
writer = MockedKafkaWriter(queue)
# Note that flipping the order of these two insertions will crash
# the test, because the legacy origin_format does not allow to create
# the origin when needed (type is missing)
writer.send('origin', 'foo', {
'url': 'http://example.com/',
'type': 'git',
})
for visit in visits:
writer.send('origin_visit', 'foo', visit)
queue_size = len(queue)
assert replayer.stop_after_objects is None
replayer.stop_after_objects = queue_size
storage = get_storage(**storage_config)
worker_fn = functools.partial(process_replay_objects, storage=storage)
replayer.process(worker_fn)
actual_visits = list(storage.origin_visit_get('http://example.com/'))
assert len(actual_visits) == len(visits), actual_visits
for vin, vout in zip(visits, actual_visits):
vin = vin.copy()
vout = vout.copy()
assert vout.pop('origin') == 'http://example.com/'
vin.pop('origin')
vin.setdefault('type', 'git')
vin.setdefault('metadata', None)
assert vin == vout
def test_write_replay_origin_visit():
"""Test origin_visit when the 'origin' is just a string."""
now = datetime.datetime.now()
visits = [{
'visit': 1,
'origin': 'http://example.com/',
'date': now,
'type': 'git',
'status': 'partial',
'snapshot': None,
}]
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit1():
"""Test origin_visit when there is no type."""
now = datetime.datetime.now()
visits = [{
'visit': 1,
'origin': 'http://example.com/',
'date': now,
'status': 'partial',
'snapshot': None,
}]
with pytest.raises(ValueError, match='too old'):
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit2():
"""Test origin_visit when 'type' is missing from the visit, but not
from the origin."""
now = datetime.datetime.now()
visits = [{
'visit': 1,
'origin': {
'url': 'http://example.com/',
'type': 'git',
},
'date': now,
'type': 'git',
'status': 'partial',
'snapshot': None,
}]
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit3():
"""Test origin_visit when the origin is a dict"""
now = datetime.datetime.now()
visits = [{
'visit': 1,
'origin': {
'url': 'http://example.com/',
},
'date': now,
'type': 'git',
'status': 'partial',
'snapshot': None,
}]
_test_write_replay_origin_visit(visits)
hash_strategy = strategies.binary(min_size=20, max_size=20)
@settings(max_examples=500)
@given(strategies.sets(hash_strategy, min_size=0, max_size=500),
strategies.sets(hash_strategy, min_size=10))
def test_is_hash_in_bytearray(haystack, needles):
array = b''.join(sorted(haystack))
needles |= haystack # Exhaustively test for all objects in the array
for needle in needles:
assert is_hash_in_bytearray(needle, array, len(haystack)) == \
(needle in haystack)
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
index 54a0117..648d20a 100644
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -1,115 +1,115 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from confluent_kafka import Producer, KafkaException
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import BaseModel
from swh.journal.serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
class KafkaJournalWriter:
"""This class is instantiated and used by swh-storage to write incoming
new objects to Kafka before adding them to the storage backend
(eg. postgresql) itself."""
def __init__(self, brokers, prefix, client_id, producer_config=None):
self._prefix = prefix
if isinstance(brokers, str):
brokers = [brokers]
if not producer_config:
producer_config = {}
self.producer = Producer({
'bootstrap.servers': ','.join(brokers),
'client.id': client_id,
'on_delivery': self._on_delivery,
'error_cb': self._error_cb,
'logger': logger,
- 'enable.idempotence': 'true',
+ 'acks': 'all',
**producer_config,
})
def _error_cb(self, error):
if error.fatal():
raise KafkaException(error)
logger.info('Received non-fatal kafka error: %s', error)
def _on_delivery(self, error, message):
if error is not None:
self._error_cb(error)
def send(self, topic, key, value):
self.producer.produce(
topic=topic,
key=key_to_kafka(key),
value=value_to_kafka(value),
)
# Need to service the callbacks regularly by calling poll
self.producer.poll(0)
def flush(self):
self.producer.flush()
def _get_key(self, object_type, object_):
if object_type in ('revision', 'release', 'directory', 'snapshot'):
return object_['id']
elif object_type == 'content':
return object_['sha1'] # TODO: use a dict of hashes
elif object_type == 'skipped_content':
return {
hash: object_[hash]
for hash in DEFAULT_ALGORITHMS
}
elif object_type == 'origin':
return {'url': object_['url']}
elif object_type == 'origin_visit':
return {
'origin': object_['origin'],
'date': str(object_['date']),
}
else:
raise ValueError('Unknown object type: %s.' % object_type)
def _sanitize_object(self, object_type, object_):
if object_type == 'origin_visit':
return {
**object_,
'date': str(object_['date']),
}
elif object_type == 'origin':
assert 'id' not in object_
return object_
def _write_addition(self, object_type, object_):
"""Write a single object to the journal"""
if isinstance(object_, BaseModel):
object_ = object_.to_dict()
topic = '%s.%s' % (self._prefix, object_type)
key = self._get_key(object_type, object_)
dict_ = self._sanitize_object(object_type, object_)
logger.debug('topic: %s, key: %s, value: %s', topic, key, dict_)
self.send(topic, key=key, value=dict_)
def write_addition(self, object_type, object_):
"""Write a single object to the journal"""
self._write_addition(object_type, object_)
self.flush()
write_update = write_addition
def write_additions(self, object_type, objects):
"""Write a set of objects to the journal"""
for object_ in objects:
self._write_addition(object_type, object_)
self.flush()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:49 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3269642
Attached To
rDJNL Journal infrastructure
Event Timeline
Log In to Comment