diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -235,9 +235,8 @@ return Revision.from_dict(rev) -def _fix_origin_visit(visit: Dict) -> OriginVisit: - """Adapt origin visits into a list of current storage compatible - OriginVisits. +def _fix_origin_visit(visit: Dict) -> Optional[OriginVisit]: + """Adapt origin visit into current storage compatible OriginVisit. `visit['origin']` is a dict instead of an URL: @@ -273,6 +272,24 @@ 'status': 'ongoing', 'type': 'hg'} + Old visit format (origin_visit with no type) is dropped + + >>> pprint(_fix_origin_visit( + ... {'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + None + + >>> pprint(_fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + None + """ # noqa visit = visit.copy() if 'type' not in visit: @@ -281,9 +298,13 @@ # but their 'origin' field was a dict with a 'type' key. visit['type'] = visit['origin']['type'] else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') + # Very old schema version: 'type' is missing, nothing to do, skip + logger.warning( + 'Old origin visit format detected, skipping: %(visit)s', { + 'visit': visit + } + ) + return None if isinstance(visit['origin'], dict): # Old version of the schema: visit['origin'] was a dict. visit['origin'] = visit['origin']['url'] @@ -357,8 +378,14 @@ revisions.append(rev) storage.revision_add(revisions) elif object_type == 'origin_visit': - visits = [_fix_origin_visit(v) for v in objects] - storage.origin_add(Origin(url=v.origin) for v in visits) + visits: List[OriginVisit] = [] + origins: List[Origin] = [] + for obj in objects: + visit = _fix_origin_visit(obj) + if visit: + visits.append(visit) + origins.append(Origin(url=visit.origin)) + storage.origin_add(origins) storage.origin_visit_upsert(visits) elif object_type in ('directory', 'release', 'snapshot', 'origin'): method = getattr(storage, object_type + '_add') diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -8,12 +8,11 @@ import logging import random from subprocess import Popen -from typing import Dict, Tuple +from typing import Dict, List, Tuple import dateutil from confluent_kafka import Producer from hypothesis import strategies, given, settings -import pytest from swh.storage import get_storage @@ -276,7 +275,8 @@ assert expected_content_hashes in actual_colliding_hashes -def _test_write_replay_origin_visit(visits): +def _test_write_replay_origin_visit( + visits: List[Dict], with_skipped_visits: bool = False): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to @@ -284,10 +284,10 @@ listening to. Check that corresponding origin visits entities are present in the storage - and have correct values. + and have correct values if they are not skipped. """ - queue = [] + queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) @@ -312,16 +312,19 @@ actual_visits = list(storage.origin_visit_get('http://example.com/')) - assert len(actual_visits) == len(visits), actual_visits + if with_skipped_visits: + assert len(actual_visits) == 0, "Old format visits are dropped" + else: # + assert len(actual_visits) == len(visits), actual_visits - for vin, vout in zip(visits, actual_visits): - vin = vin.copy() - vout = vout.copy() - assert vout.pop('origin') == 'http://example.com/' - vin.pop('origin') - vin.setdefault('type', 'git') - vin.setdefault('metadata', None) - assert vin == vout + for vin, vout in zip(visits, actual_visits): + vin = vin.copy() + vout = vout.copy() + assert vout.pop('origin') == 'http://example.com/' + vin.pop('origin') + vin.setdefault('type', 'git') + vin.setdefault('metadata', None) + assert vin == vout def test_write_replay_origin_visit(): @@ -338,18 +341,27 @@ _test_write_replay_origin_visit(visits) -def test_write_replay_legacy_origin_visit1(): - """Test origin_visit when there is no type.""" +def test_write_replay_legacy_origin_visit1(caplog): + """Origin_visit with no types should be skipped when replayed + + """ now = datetime.datetime.now() - visits = [{ + visit = { 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, - }] - with pytest.raises(ValueError, match='too old'): - _test_write_replay_origin_visit(visits) + } + now2 = datetime.datetime.now() + visit2 = { + 'visit': 2, + 'origin': {'url': 'http://example.com/'}, + 'date': now2, + 'status': 'partial', + 'snapshot': None, + } + _test_write_replay_origin_visit([visit, visit2], with_skipped_visits=True) def test_write_replay_legacy_origin_visit2():