diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -3,9 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from kafka import KafkaConsumer import logging +from kafka import KafkaConsumer + from .serializers import kafka_to_key, kafka_to_value from swh.journal import DEFAULT_PREFIX diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -35,12 +35,21 @@ method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': - storage.origin_visit_upsert([ - { - **obj, - 'origin': storage.origin_add_one(obj['origin']) - } - for obj in objects]) + for visit in objects: + if isinstance(visit['origin'], str): + # old format; note that it will crash with the pg and + # in-mem storages if the origin is not already known, + # but there is no other choice because we can't add an + # origin without knowing its type. Non-pg storages + # don't use a numeric FK internally, + visit['origin'] = {'url': visit['origin']} + else: + storage.origin_add_one(visit['origin']) + if 'type' not in visit: + # old format + visit['type'] = visit['origin']['type'] + + storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -13,12 +13,14 @@ from kafka import KafkaProducer from swh.storage import get_storage +from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects from .conftest import OBJECT_TYPE_KEYS +from .utils import MockedJournalClient, MockedKafkaWriter def test_storage_play( @@ -100,3 +102,97 @@ [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] + + +def test_write_replay_legacy_origin_visit1(): + """Test origin_visit when the 'origin' is just a string.""" + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + # Note that flipping the order of these two insertions will crash + # the test, because the legacy origin_format does not allow to create + # the origin when needed (type is missing) + now = datetime.datetime.now() + writer.send('origin', 'foo', { + 'url': 'http://example.com/', + 'type': 'git', + }) + writer.send('origin_visit', 'foo', { + 'visit': 1, + 'origin': 'http://example.com/', + 'date': now, + }) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage = get_storage('memory', {}) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + visits = list(storage.origin_visit_get('http://example.com/')) + + if ENABLE_ORIGIN_IDS: + assert visits == [{ + 'visit': 1, + 'origin': 1, + 'date': now, + }] + else: + assert visits == [{ + 'visit': 1, + 'origin': {'url': 'http://example.com/'}, + 'date': now, + }] + + +def test_write_replay_legacy_origin_visit2(): + """Test origin_visit when 'type' is missing.""" + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + now = datetime.datetime.now() + writer.send('origin', 'foo', { + 'url': 'http://example.com/', + 'type': 'git', + }) + writer.send('origin_visit', 'foo', { + 'visit': 1, + 'origin': { + 'url': 'http://example.com/', + 'type': 'git', + }, + 'date': now, + }) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage = get_storage('memory', {}) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + visits = list(storage.origin_visit_get('http://example.com/')) + + if ENABLE_ORIGIN_IDS: + assert visits == [{ + 'visit': 1, + 'origin': 1, + 'date': now, + 'type': 'git', + }] + else: + assert visits == [{ + 'visit': 1, + 'origin': {'url': 'http://example.com/'}, + 'date': now, + 'type': 'git', + }] diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import namedtuple import functools from hypothesis import given, settings, HealthCheck @@ -13,51 +12,10 @@ from swh.storage.in_memory import Storage from swh.storage import HashCollision -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES -from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content -from swh.journal.serializers import ( - key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) - -FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') -FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') - - -class MockedKafkaWriter(DirectKafkaWriter): - def __init__(self, queue): - self._prefix = 'prefix' - self.queue = queue - - def send(self, topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if self.queue and {partition} == set(self.queue[-1]): - # The last message is of the same object type, groupping them - self.queue[-1][partition].append(msg) - else: - self.queue.append({partition: [msg]}) - - -class MockedKafkaConsumer: - def __init__(self, queue): - self.queue = queue - self.committed = False - - def poll(self): - return self.queue.pop(0) - - def commit(self): - if self.queue == []: - self.committed = True - -class MockedJournalClient(JournalClient): - def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): - self._object_types = object_types - self.consumer = MockedKafkaConsumer(queue) +from .utils import MockedJournalClient, MockedKafkaWriter @given(lists(object_dicts(), min_size=1)) diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/utils.py @@ -0,0 +1,45 @@ +from collections import namedtuple + +from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.serializers import ( + key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) + +FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') +FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') + + +class MockedKafkaWriter(DirectKafkaWriter): + def __init__(self, queue): + self._prefix = 'prefix' + self.queue = queue + + def send(self, topic, key, value): + key = kafka_to_key(key_to_kafka(key)) + value = kafka_to_value(value_to_kafka(value)) + partition = FakeKafkaPartition(topic) + msg = FakeKafkaMessage(key=key, value=value) + if self.queue and {partition} == set(self.queue[-1]): + # The last message is of the same object type, groupping them + self.queue[-1][partition].append(msg) + else: + self.queue.append({partition: [msg]}) + + +class MockedKafkaConsumer: + def __init__(self, queue): + self.queue = queue + self.committed = False + + def poll(self): + return self.queue.pop(0) + + def commit(self): + if self.queue == []: + self.committed = True + + +class MockedJournalClient(JournalClient): + def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): + self._object_types = object_types + self.consumer = MockedKafkaConsumer(queue)