diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,7 +6,9 @@ import copy import logging from time import time -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import ( + Any, Callable, Dict, Iterable, List, Optional +) from sentry_sdk import capture_exception, push_scope try: @@ -22,7 +24,11 @@ from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex -from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE + +from swh.model.model import ( + BaseContent, BaseModel, Content, Directory, Origin, Revision, + SHA1_SIZE, SkippedContent, Snapshot, Release +) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) @@ -38,6 +44,17 @@ CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" +object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { + 'origin': Origin.from_dict, + 'snapshot': Snapshot.from_dict, + 'revision': Revision.from_dict, + 'release': Release.from_dict, + 'directory': Directory.from_dict, + 'content': Content.from_dict, + 'skipped_content': SkippedContent.from_dict, +} + + def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) @@ -101,46 +118,8 @@ return _check_date(rev['date']) and _check_date(rev['committer_date']) -def _fix_revisions(revisions): - good_revisions = [] - for rev in revisions: - rev = _fix_revision_pypi_empty_string(rev) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logging.warning('Excluding revision (invalid date): %r', rev) - continue - if rev not in good_revisions: - good_revisions.append(rev) - return good_revisions - - -def _fix_origin_visits(visits): - good_visits = [] - for visit in visits: - visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] - else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') - if isinstance(visit['origin'], dict): - # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - good_visits.append(visit) - return good_visits - - -def fix_objects(object_type, objects): - """Converts a possibly old object from the journal to its current - expected format. - - List of conversions: - - Empty author name/email in PyPI releases: +def _fix_revisions(revisions: List[Dict]) -> List[Dict]: + """Adapt revisions into a list of (current) storage compatible dicts. >>> from pprint import pprint >>> date = { @@ -150,7 +129,7 @@ ... }, ... 'offset': 0, ... } - >>> pprint(fix_objects('revision', [{ + >>> pprint(_fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, @@ -165,7 +144,7 @@ Fix type of 'transplant_source' extra headers: - >>> revs = fix_objects('revision', [{ + >>> revs = _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, @@ -173,7 +152,7 @@ ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa ... ]} ... }]) >>> pprint(revs[0]['metadata']['extra_headers']) @@ -185,7 +164,7 @@ >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> fix_objects('revision', [{ + >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date1, @@ -195,7 +174,7 @@ >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> fix_objects('revision', [{ + >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date2, @@ -205,7 +184,7 @@ >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> fix_objects('revision', [{ + >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': date, @@ -213,28 +192,58 @@ ... }]) [] + """ + good_revisions: List = [] + for rev in revisions: + rev = _fix_revision_pypi_empty_string(rev) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logging.warning('Excluding revision (invalid date): %r', rev) + continue + if rev not in good_revisions: + good_revisions.append(rev) + return good_revisions + + +def _fix_origin_visits(visits: List[Dict]) -> List[Dict]: + """Adapt origin visits into a list of current storage compatible dicts. `visit['origin']` is a dict instead of an URL: - >>> pprint(fix_objects('origin_visit', [{ + >>> from pprint import pprint + >>> pprint(_fix_origin_visits([{ ... 'origin': {'url': 'http://foo'}, ... 'type': 'git', ... }])) - [{'origin': 'http://foo', 'type': 'git'}] + [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}] `visit['type']` is missing , but `origin['visit']['type']` exists: - >>> pprint(fix_objects('origin_visit', [ + >>> pprint(_fix_origin_visits([ ... {'origin': {'type': 'hg', 'url': 'http://foo'} ... }])) - [{'origin': 'http://foo', 'type': 'hg'}] - """ # noqa + [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}] - if object_type == 'revision': - objects = _fix_revisions(objects) - elif object_type == 'origin_visit': - objects = _fix_origin_visits(objects) - return objects + """ + good_visits = [] + for visit in visits: + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very very old version of the schema: 'type' is missing, + # so there is nothing we can do to fix it. + raise ValueError('Got an origin_visit too old to be replayed.') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + good_visits.append(visit) + return good_visits def collision_aware_content_add( @@ -253,7 +262,7 @@ colliding_content_hashes: Dict[str, List[Dict[str, bytes]]] = {} while True: try: - content_add_fn(c.to_dict() for c in contents) + content_add_fn(contents) except HashCollision as e: algo, hash_id, colliding_hashes = e.args hash_id = hash_to_hex(hash_id) @@ -270,10 +279,13 @@ }) -def _insert_objects(object_type, objects, storage): - objects = fix_objects(object_type, objects) +def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: + """Insert objects of type object_type in the storage. + + """ if object_type == 'content': - contents, skipped_contents = [], [] + contents: List[BaseContent] = [] + skipped_contents: List[BaseContent] = [] for content in objects: c = BaseContent.from_dict(content) if isinstance(c, SkippedContent): @@ -285,19 +297,19 @@ storage.skipped_content_add, skipped_contents) collision_aware_content_add( storage.content_add_metadata, contents) - - elif object_type in ('directory', 'revision', 'release', - 'snapshot', 'origin'): - # TODO: split batches that are too large for the storage - # to handle? - method = getattr(storage, object_type + '_add') - method(objects) + elif object_type == 'revision': + storage.revision_add( + Revision.from_dict(r) for r in _fix_revisions(objects) + ) elif object_type == 'origin_visit': - for visit in objects: - storage.origin_add_one({'url': visit['origin']}) - if 'metadata' not in visit: - visit['metadata'] = None - storage.origin_visit_upsert(objects) + visits = _fix_origin_visits(objects) + storage.origin_add(Origin(url=v['origin']) for v in visits) + # FIXME: Should be List[OriginVisit], working on fixing + # swh.storage.origin_visit_upsert (D2813) + storage.origin_visit_upsert(visits) + elif object_type in ('directory', 'release', 'snapshot', 'origin'): + method = getattr(storage, object_type + '_add') + method(object_converter_fn[object_type](o) for o in objects) else: logger.warning('Received a series of %s, this should not happen', object_type) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -51,7 +51,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory'}, ] } diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -8,14 +8,17 @@ from confluent_kafka import Consumer, KafkaException from subprocess import Popen -from typing import Tuple +from typing import List, Tuple from swh.storage import get_storage -from swh.journal.writer.kafka import KafkaJournalWriter +from swh.journal.replay import object_converter_fn from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) +from swh.journal.writer.kafka import KafkaJournalWriter + +from swh.model.model import Content, Origin, BaseModel from .conftest import OBJECT_TYPE_KEYS @@ -116,7 +119,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory', 'journal_writer': writer_config}, ] } @@ -129,15 +131,25 @@ method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): + objects_: List[BaseModel] if object_type == 'content': - objects = [{**obj, 'data': b''} for obj in objects] - method(objects) + objects_ = [ + Content.from_dict({ + **obj, 'data': b''}) + for obj in objects + ] + else: + objects_ = [ + object_converter_fn[object_type](obj) + for obj in objects + ] + method(objects_) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_url = object_.pop('origin') - storage.origin_add_one({'url': origin_url}) + storage.origin_add_one(Origin(url=origin_url)) visit = method(origin=origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -30,7 +30,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory'}, ] } diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,10 +11,12 @@ from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts, present_contents +from swh.model.model import Origin from swh.storage import get_storage, HashCollision -from swh.journal.replay import process_replay_objects -from swh.journal.replay import process_replay_objects_content +from swh.journal.replay import ( + process_replay_objects, process_replay_objects_content, object_converter_fn +) from .utils import MockedJournalClient, MockedKafkaWriter @@ -22,7 +24,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, ] } @@ -64,17 +65,18 @@ return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) + # Write objects to storage1 for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': - storage1.origin_add_one({'url': obj['origin']}) + storage1.origin_add_one(Origin(url=obj['origin'])) storage1.origin_visit_upsert([obj]) else: if obj_type == 'content' and obj.get('status') == 'absent': obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: - method([obj]) + method([object_converter_fn[obj_type](obj)]) except HashCollision: pass @@ -128,7 +130,6 @@ contents = [] for obj in objects: - obj = obj.to_dict() storage1.content_add([obj]) contents.append(obj) @@ -152,7 +153,7 @@ # only content with status visible will be copied in storage2 expected_objstorage_state = { - c['sha1']: c['data'] for c in contents if c['status'] == 'visible' + c.sha1: c.data for c in contents if c.status == 'visible' } assert expected_objstorage_state == objstorage2.state