diff --git a/swh/journal/replay.py b/swh/journal/replay.py index d3edb4a..ec84d5c 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,72 +1,71 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from kafka import KafkaConsumer from .serializers import kafka_to_value logger = logging.getLogger(__name__) OBJECT_TYPES = frozenset([ 'origin', 'origin_visit', 'snapshot', 'release', 'revision', 'directory', 'content', ]) class StorageReplayer: def __init__(self, brokers, prefix, consumer_id, object_types=OBJECT_TYPES): if not set(object_types).issubset(OBJECT_TYPES): raise ValueError('Unknown object types: %s' % ', '.join( set(object_types) - OBJECT_TYPES)) self._object_types = object_types self.consumer = KafkaConsumer( bootstrap_servers=brokers, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=False, group_id=consumer_id, ) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) def fill(self, storage, max_messages=None): num = 0 for message in self.consumer: object_type = message.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type self.insert_object(storage, object_type, message.value) num += 1 if max_messages and num >= max_messages: break return num def insert_object(self, storage, object_type, object_): if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': - # TODO: we don't write contents in Kafka, so we need to - # find a way to insert them somehow. - object_['status'] = 'absent' - method = getattr(storage, object_type + '_add') + method = storage.content_add_metadata + else: + method = getattr(storage, object_type + '_add') method([object_]) elif object_type == 'origin_visit': origin_id = storage.origin_add_one(object_.pop('origin')) visit = storage.origin_visit_add( origin=origin_id, date=object_.pop('date')) storage.origin_visit_update( origin_id, visit['visit'], **object_) else: assert False diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 03e9872..3e2f133 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,82 +1,92 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import StorageReplayer from .conftest import OBJECT_TYPE_KEYS def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) + now = datetime.datetime.now(tz=datetime.timezone.utc) + # Fill Kafka nb_sent = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == 'content': + object_['ctime'] = now producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', 'prefix': kafka_prefix, } replayer = StorageReplayer(**config) nb_inserted = replayer.fill(storage, max_messages=nb_sent) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits - # TODO: check for content + + contents = list(storage.content_get_metadata( + [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) + assert None not in contents + assert contents == OBJECT_TYPE_KEYS['content'][1]