diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py --- a/swh/journal/__init__.py +++ b/swh/journal/__init__.py @@ -3,5 +3,23 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Callable, Dict + +from swh.model.model import ( + BaseModel, Content, Directory, Origin, + Release, Revision, SkippedContent, Snapshot +) + # the default prefix for kafka's topics DEFAULT_PREFIX = 'swh.journal.objects' + + +object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { + 'origin': Origin.from_dict, + 'snapshot': Snapshot.from_dict, + 'revision': Revision.from_dict, + 'release': Release.from_dict, + 'directory': Directory.from_dict, + 'content': Content.from_dict, + 'skipped_content': SkippedContent.from_dict, +} diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,7 +6,9 @@ import copy import logging from time import time -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import ( + Any, Callable, Dict, Iterable, List, Optional, Union, Sequence, Tuple +) from sentry_sdk import capture_exception, push_scope try: @@ -22,12 +24,18 @@ from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex -from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE + +from swh.model.model import ( + BaseContent, BaseModel, Content, Origin, + Revision, SHA1_SIZE, SkippedContent +) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) from swh.storage import HashCollision +from swh.journal import object_converter_fn + logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" @@ -101,8 +109,11 @@ return _check_date(rev['date']) and _check_date(rev['committer_date']) -def _fix_revisions(revisions): - good_revisions = [] +def _fix_revisions(revisions: List[Dict]) -> List[Revision]: + """Fix revisions and convert it into a list of model objects + + """ + good_revisions: List[Revision] = [] for rev in revisions: rev = _fix_revision_pypi_empty_string(rev) rev = _fix_revision_transplant_source(rev) @@ -110,11 +121,14 @@ logging.warning('Excluding revision (invalid date): %r', rev) continue if rev not in good_revisions: - good_revisions.append(rev) + good_revisions.append(Revision.from_dict(rev)) return good_revisions -def _fix_origin_visits(visits): +def _fix_origin_visits(visits: List[Dict]) -> List[Dict]: + """Fix origin visits + + """ good_visits = [] for visit in visits: visit = visit.copy() @@ -130,19 +144,37 @@ if isinstance(visit['origin'], dict): # Old version of the schema: visit['origin'] was a dict. visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + + # FIXME: Should be an OriginVisit, it's not because of + # swh.storage.oriin_visit_upsert inconsistency (to be fixed soon) good_visits.append(visit) return good_visits -def fix_objects(object_type, objects): - """Converts a possibly old object from the journal to its current +def fix_and_convert_objects( + object_type: str, objects: List[Dict]) -> Union[ + List[Dict], + Tuple[List[SkippedContent], List[Content]], + Sequence[BaseModel] + ]: + """Converts a possibly list of old objects from the journal to its current expected format. + The result is inconsistent but slowly converging on the BaseModel: + - List[Dict]: Origin visits on the storage side still needs to be dict + - Tuple[List, List]: Contents are split in 2 BaseModel Content + SkippedContent + - List[BaseModel]: The normalized format we tend to + converge. + List of conversions: Empty author name/email in PyPI releases: >>> from pprint import pprint + >>> author = {'email': '', 'fullname': b'', 'name': ''} >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, @@ -150,33 +182,39 @@ ... }, ... 'offset': 0, ... } - >>> pprint(fix_objects('revision', [{ - ... 'author': {'email': '', 'fullname': b'', 'name': ''}, - ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, + >>> revs = fix_and_convert_objects('revision', [{ + ... 'id': b'show-me-some-id', + ... 'author': author, + ... 'committer': author, ... 'date': date, + ... 'type': 'git', + ... 'message': 'commit', + ... 'synthetic': True, + ... 'directory': b'dir-id', ... 'committer_date': date, - ... }])) - [{'author': {'email': b'', 'fullname': b'', 'name': b''}, - 'committer': {'email': b'', 'fullname': b'', 'name': b''}, - 'committer_date': {'offset': 0, - 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, - 'date': {'offset': 0, - 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] + ... }]) + >>> isinstance(revs[0], Revision) + True Fix type of 'transplant_source' extra headers: - >>> revs = fix_objects('revision', [{ + >>> revs = fix_and_convert_objects('revision', [{ + ... 'id': b'some-id', ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, + ... 'type': 'tar', + ... 'message': 'commit', + ... 'synthetic': True, + ... 'directory': b'dir-id', ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]} ... }]) - >>> pprint(revs[0]['metadata']['extra_headers']) + >>> pprint(revs[0].metadata['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] @@ -185,7 +223,7 @@ >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> fix_objects('revision', [{ + >>> fix_and_convert_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date1, @@ -195,7 +233,7 @@ >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> fix_objects('revision', [{ + >>> fix_and_convert_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date2, @@ -205,7 +243,7 @@ >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> fix_objects('revision', [{ + >>> fix_and_convert_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': date, @@ -216,25 +254,36 @@ `visit['origin']` is a dict instead of an URL: - >>> pprint(fix_objects('origin_visit', [{ + >>> pprint(fix_and_convert_objects('origin_visit', [{ ... 'origin': {'url': 'http://foo'}, ... 'type': 'git', ... }])) - [{'origin': 'http://foo', 'type': 'git'}] + [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}] `visit['type']` is missing , but `origin['visit']['type']` exists: - >>> pprint(fix_objects('origin_visit', [ + >>> pprint(fix_and_convert_objects('origin_visit', [ ... {'origin': {'type': 'hg', 'url': 'http://foo'} ... }])) - [{'origin': 'http://foo', 'type': 'hg'}] + [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}] + """ # noqa + if object_type == 'content': + contents, skipped_contents = [], [] + for content in objects: + c = BaseContent.from_dict(content) + if isinstance(c, SkippedContent): + skipped_contents.append(c) + else: + contents.append(c) - if object_type == 'revision': - objects = _fix_revisions(objects) + return skipped_contents, contents + elif object_type == 'revision': + return _fix_revisions(objects) elif object_type == 'origin_visit': - objects = _fix_origin_visits(objects) - return objects + return _fix_origin_visits(objects) + else: + return [object_converter_fn[object_type](obj) for obj in objects] def collision_aware_content_add( @@ -253,7 +302,7 @@ colliding_content_hashes: Dict[str, List[Dict[str, bytes]]] = {} while True: try: - content_add_fn(c.to_dict() for c in contents) + content_add_fn(contents) except HashCollision as e: algo, hash_id, colliding_hashes = e.args hash_id = hash_to_hex(hash_id) @@ -271,21 +320,13 @@ def _insert_objects(object_type, objects, storage): - objects = fix_objects(object_type, objects) + objects = fix_and_convert_objects(object_type, objects) if object_type == 'content': - contents, skipped_contents = [], [] - for content in objects: - c = BaseContent.from_dict(content) - if isinstance(c, SkippedContent): - skipped_contents.append(c) - else: - contents.append(c) - + skipped_contents, contents = objects collision_aware_content_add( storage.skipped_content_add, skipped_contents) collision_aware_content_add( storage.content_add_metadata, contents) - elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage @@ -294,9 +335,8 @@ method(objects) elif object_type == 'origin_visit': for visit in objects: - storage.origin_add_one({'url': visit['origin']}) - if 'metadata' not in visit: - visit['metadata'] = None + storage.origin_add_one(Origin(url=visit['origin'])) + # FIXME: Align the storage to accept base model objects... storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -51,7 +51,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory'}, ] } diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -8,15 +8,18 @@ from confluent_kafka import Consumer, KafkaException from subprocess import Popen -from typing import Tuple +from typing import List, Tuple from swh.storage import get_storage +from swh.journal import object_converter_fn from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) +from swh.model.model import Content, Origin, BaseModel + from .conftest import OBJECT_TYPE_KEYS @@ -116,7 +119,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory', 'journal_writer': writer_config}, ] } @@ -129,15 +131,25 @@ method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): + objects_: List[BaseModel] if object_type == 'content': - objects = [{**obj, 'data': b''} for obj in objects] - method(objects) + objects_ = [ + Content.from_dict({ + **obj, 'data': b''}) + for obj in objects + ] + else: + objects_ = [ + object_converter_fn[object_type](obj) + for obj in objects + ] + method(objects_) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_url = object_.pop('origin') - storage.origin_add_one({'url': origin_url}) + storage.origin_add_one(Origin(url=origin_url)) visit = method(origin=origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -30,7 +30,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory'}, ] } @@ -95,9 +94,7 @@ stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) + nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage @@ -211,9 +208,7 @@ stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) + nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,10 +11,14 @@ from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts, present_contents +from swh.model.model import Origin from swh.storage import get_storage, HashCollision -from swh.journal.replay import process_replay_objects -from swh.journal.replay import process_replay_objects_content +from swh.journal.replay import ( + process_replay_objects, process_replay_objects_content +) + +from swh.journal import object_converter_fn from .utils import MockedJournalClient, MockedKafkaWriter @@ -22,7 +26,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, ] } @@ -64,17 +67,18 @@ return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) + # Write objects to storage1 for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': - storage1.origin_add_one({'url': obj['origin']}) + storage1.origin_add_one(Origin(url=obj['origin'])) storage1.origin_visit_upsert([obj]) else: if obj_type == 'content' and obj.get('status') == 'absent': obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: - method([obj]) + method([object_converter_fn[obj_type](obj)]) except HashCollision: pass @@ -128,7 +132,7 @@ contents = [] for obj in objects: - obj = obj.to_dict() + obj = obj storage1.content_add([obj]) contents.append(obj) @@ -152,7 +156,7 @@ # only content with status visible will be copied in storage2 expected_objstorage_state = { - c['sha1']: c['data'] for c in contents if c['status'] == 'visible' + c.sha1: c.data for c in contents if c.status == 'visible' } assert expected_objstorage_state == objstorage2.state