diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -3,9 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from kafka import KafkaConsumer import logging +from kafka import KafkaConsumer + from .serializers import kafka_to_key, kafka_to_value from swh.journal import DEFAULT_PREFIX diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -35,12 +35,21 @@ method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': - storage.origin_visit_upsert([ - { - **obj, - 'origin': storage.origin_add_one(obj['origin']) - } - for obj in objects]) + for visit in objects: + if isinstance(visit['origin'], str): + # old format; note that it will crash with the pg and + # in-mem storages if the origin is not already known, + # but there is no other choice because we can't add an + # origin without knowing its type. Non-pg storages + # don't use a numeric FK internally, + visit['origin'] = {'url': visit['origin']} + else: + storage.origin_add_one(visit['origin']) + if 'type' not in visit: + # old format + visit['type'] = visit['origin']['type'] + + storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type)