diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -235,8 +235,7 @@ def _fix_origin_visit(visit: Dict) -> OriginVisit: - """Adapt origin visits into a list of current storage compatible - OriginVisits. + """Adapt origin visit into current storage compatible OriginVisit. `visit['origin']` is a dict instead of an URL: @@ -272,6 +271,28 @@ 'status': 'ongoing', 'type': 'hg'} + Old visit format (origin_visit with no type) raises: + + >>> _fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + >>> _fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + """ # noqa visit = visit.copy() if 'type' not in visit: @@ -280,9 +301,12 @@ # but their 'origin' field was a dict with a 'type' key. visit['type'] = visit['origin']['type'] else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') + # Very old schema version: 'type' is missing, stop early + + # We expect the journal's origin_visit topic to no longer reference + # such visits. If it does, the replayer must crash so we can fix + # the journal's topic. + raise ValueError(f'Old origin visit format detected: {visit}') if isinstance(visit['origin'], dict): # Old version of the schema: visit['origin'] was a dict. visit['origin'] = visit['origin']['url'] @@ -356,8 +380,13 @@ revisions.append(rev) storage.revision_add(revisions) elif object_type == 'origin_visit': - visits = [_fix_origin_visit(v) for v in objects] - storage.origin_add(Origin(url=v.origin) for v in visits) + visits: List[OriginVisit] = [] + origins: List[Origin] = [] + for obj in objects: + visit = _fix_origin_visit(obj) + visits.append(visit) + origins.append(Origin(url=visit.origin)) + storage.origin_add(origins) storage.origin_visit_upsert(visits) elif object_type in ('directory', 'release', 'snapshot', 'origin'): method = getattr(storage, object_type + '_add') diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -8,12 +8,13 @@ import logging import random from subprocess import Popen -from typing import Dict, Tuple +from typing import Dict, List, Tuple import dateutil +import pytest + from confluent_kafka import Producer from hypothesis import strategies, given, settings -import pytest from swh.storage import get_storage @@ -276,7 +277,7 @@ assert expected_content_hashes in actual_colliding_hashes -def _test_write_replay_origin_visit(visits): +def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to @@ -284,10 +285,10 @@ listening to. Check that corresponding origin visits entities are present in the storage - and have correct values. + and have correct values if they are not skipped. """ - queue = [] + queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) @@ -339,17 +340,33 @@ def test_write_replay_legacy_origin_visit1(): - """Test origin_visit when there is no type.""" + """Origin_visit with no types should make the replayer crash + + We expect the journal's origin_visit topic to no longer reference such + visits. If it does, the replayer must crash so we can fix the journal's + topic. + + """ now = datetime.datetime.now() - visits = [{ + visit = { 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, - }] - with pytest.raises(ValueError, match='too old'): - _test_write_replay_origin_visit(visits) + } + now2 = datetime.datetime.now() + visit2 = { + 'visit': 2, + 'origin': {'url': 'http://example.com/'}, + 'date': now2, + 'status': 'partial', + 'snapshot': None, + } + + for origin_visit in [visit, visit2]: + with pytest.raises(ValueError, match='Old origin visit format'): + _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2():