diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -57,10 +57,9 @@ if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': - # TODO: we don't write contents in Kafka, so we need to - # find a way to insert them somehow. - object_['status'] = 'absent' - method = getattr(storage, object_type + '_add') + method = storage.content_add_metadata + else: + method = getattr(storage, object_type + '_add') method([object_]) elif object_type == 'origin_visit': origin_id = storage.origin_add_one(object_.pop('origin')) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import random from subprocess import Popen from typing import Tuple @@ -33,12 +34,17 @@ client_id='test producer', ) + now = datetime.datetime.now(tz=datetime.timezone.utc) + # Fill Kafka nb_sent = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == 'content': + object_['ctime'] = now producer.send(topic, key=key, value=object_) nb_sent += 1 @@ -79,4 +85,8 @@ for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits - # TODO: check for content + + contents = list(storage.content_get_metadata( + [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) + assert None not in contents + assert contents == OBJECT_TYPE_KEYS['content'][1]