diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.60 -swh.storage >= 0.0.177 +swh.storage >= 0.0.178 diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -26,7 +26,7 @@ from swh.model.hashutil import hash_to_hex from swh.model.model import ( - BaseContent, BaseModel, Content, Directory, Origin, Revision, + BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, SHA1_SIZE, SkippedContent, Snapshot, Release ) from swh.objstorage.objstorage import ( @@ -46,6 +46,7 @@ object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { 'origin': Origin.from_dict, + 'origin_visit': OriginVisit.from_dict, 'snapshot': Snapshot.from_dict, 'revision': Revision.from_dict, 'release': Release.from_dict, @@ -205,45 +206,61 @@ return good_revisions -def _fix_origin_visits(visits: List[Dict]) -> List[Dict]: - """Adapt origin visits into a list of current storage compatible dicts. +def _fix_origin_visit(visit: Dict) -> OriginVisit: + """Adapt origin visits into a list of current storage compatible + OriginVisits. `visit['origin']` is a dict instead of an URL: + >>> from datetime import datetime, timezone >>> from pprint import pprint - >>> pprint(_fix_origin_visits([{ + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, + ... 'date': date, ... 'type': 'git', - ... }])) - [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}] + ... 'status': 'ongoing', + ... 'snapshot': None, + ... }).to_dict()) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: - >>> pprint(_fix_origin_visits([ - ... {'origin': {'type': 'hg', 'url': 'http://foo'} - ... }])) - [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}] - - """ - good_visits = [] - for visit in visits: - visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] - else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') - if isinstance(visit['origin'], dict): - # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - if 'metadata' not in visit: - visit['metadata'] = None - good_visits.append(visit) - return good_visits + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... }).to_dict()) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + """ # noqa + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very very old version of the schema: 'type' is missing, + # so there is nothing we can do to fix it. + raise ValueError('Got an origin_visit too old to be replayed.') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + return OriginVisit.from_dict(visit) def collision_aware_content_add( @@ -308,10 +325,8 @@ Revision.from_dict(r) for r in _fix_revisions(objects) ) elif object_type == 'origin_visit': - visits = _fix_origin_visits(objects) - storage.origin_add(Origin(url=v['origin']) for v in visits) - # FIXME: Should be List[OriginVisit], working on fixing - # swh.storage.origin_visit_upsert (D2813) + visits = [_fix_origin_visit(v) for v in objects] + storage.origin_add(Origin(url=v.origin) for v in visits) storage.origin_visit_upsert(visits) elif object_type in ('directory', 'release', 'snapshot', 'origin'): method = getattr(storage, object_type + '_add') diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -150,11 +150,10 @@ object_ = object_.copy() origin_url = object_.pop('origin') storage.origin_add_one(Origin(url=origin_url)) - visit = method(origin=origin_url, date=object_.pop('date'), + visit = method(origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 - visit_id = visit['visit'] - storage.origin_visit_update(origin_url, visit_id, **object_) + storage.origin_visit_update(origin_url, visit.visit, **object_) expected_messages += 1 else: assert False, object_type diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -10,7 +10,9 @@ from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists -from swh.model.hypothesis_strategies import object_dicts, present_contents +from swh.model.hypothesis_strategies import ( + object_dicts, present_contents +) from swh.model.model import Origin from swh.storage import get_storage, HashCollision @@ -22,10 +24,8 @@ storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, - ] + 'cls': 'memory', + 'journal_writer': {'cls': 'memory'}, } @@ -67,16 +67,18 @@ # Write objects to storage1 for (obj_type, obj) in objects: - obj = obj.copy() + if obj_type == 'content' and obj.get('status') == 'absent': + obj_type = 'skipped_content' + + obj = object_converter_fn[obj_type](obj) + if obj_type == 'origin_visit': - storage1.origin_add_one(Origin(url=obj['origin'])) + storage1.origin_add_one(Origin(url=obj.origin)) storage1.origin_visit_upsert([obj]) else: - if obj_type == 'content' and obj.get('status') == 'absent': - obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: - method([object_converter_fn[obj_type](obj)]) + method([obj]) except HashCollision: pass