diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py index f3f5797..a3ff5b1 100644 --- a/swh/journal/fixer.py +++ b/swh/journal/fixer.py @@ -1,268 +1,290 @@ import copy import logging from typing import Any, Dict, List, Optional from swh.model.identifiers import normalize_timestamp logger = logging.getLogger(__name__) def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: """Filters-out invalid 'perms' key that leaked from swh.model.from_disk to the journal. >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) {'sha1_git': b'foo'} >>> _fix_content({'sha1_git': b'bar'}) {'sha1_git': b'bar'} """ content = content.copy() content.pop('perms', None) return content def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, 'author': rev['author'].copy(), 'committer': rev['committer'].copy(), } if rev['author'].get('email') == '': rev['author']['email'] = b'' if rev['author'].get('name') == '': rev['author']['name'] = b'' if rev['committer'].get('email') == '': rev['committer']['email'] = b'' if rev['committer'].get('name') == '': rev['committer']['name'] = b'' return rev def _fix_revision_transplant_source(rev): if rev.get('metadata') and rev['metadata'].get('extra_headers'): rev = copy.deepcopy(rev) rev['metadata']['extra_headers'] = [ [key, value.encode('ascii')] if key == 'transplant_source' and isinstance(value, str) else [key, value] for (key, value) in rev['metadata']['extra_headers']] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ and (0 <= date['timestamp']['microseconds'] < 10**6) \ and (-2**15 <= date['offset'] < 2**15) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev['date']) and _check_date(rev['committer_date']) def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: """Fix various legacy revision issues. Fix author/committer person: >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> rev0 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> rev0['author'] {'fullname': b'', 'name': b'', 'email': b''} >>> rev0['committer'] {'fullname': b'', 'name': b'', 'email': b''} Fix type of 'transplant_source' extra headers: >>> rev1 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]}, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> pprint(rev1['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Revision with invalid date are filtered: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }) >>> rev is None True """ # noqa rev = _fix_revision_pypi_empty_string(revision) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): logger.warning('Invalid revision date detected: %(revision)s', { 'revision': rev }) return None return rev +def _fix_origin(origin: Dict) -> Dict: + """Fix legacy origin with type which is no longer part of the model. + + >>> from pprint import pprint + >>> pprint(_fix_origin({ + ... 'url': 'http://foo', + ... })) + {'url': 'http://foo'} + >>> pprint(_fix_origin({ + ... 'url': 'http://bar', + ... 'type': 'foo', + ... })) + {'url': 'http://bar'} + + """ + o = origin.copy() + o.pop('type', None) + return o + + def _fix_origin_visit(visit: Dict) -> Dict: """Fix various legacy origin visit issues. `visit['origin']` is a dict instead of an URL: >>> from datetime import datetime, timezone >>> from pprint import pprint >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'type': 'git', ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'metadata': None, 'origin': 'http://foo', 'snapshot': None, 'status': 'ongoing', 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: >>> pprint(_fix_origin_visit( ... {'origin': {'type': 'hg', 'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'metadata': None, 'origin': 'http://foo', 'snapshot': None, 'status': 'ongoing', 'type': 'hg'} Old visit format (origin_visit with no type) raises: >>> _fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... >>> _fix_origin_visit({ ... 'origin': 'http://foo', ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... """ # noqa visit = visit.copy() if 'type' not in visit: if isinstance(visit['origin'], dict) and 'type' in visit['origin']: # Very old version of the schema: visits did not have a type, # but their 'origin' field was a dict with a 'type' key. visit['type'] = visit['origin']['type'] else: # Very old schema version: 'type' is missing, stop early # We expect the journal's origin_visit topic to no longer reference # such visits. If it does, the replayer must crash so we can fix # the journal's topic. raise ValueError(f'Old origin visit format detected: {visit}') if isinstance(visit['origin'], dict): # Old version of the schema: visit['origin'] was a dict. visit['origin'] = visit['origin']['url'] if 'metadata' not in visit: visit['metadata'] = None return visit def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: """ Fix legacy objects from the journal to bring them up to date with the latest storage schema. """ if object_type == 'content': return [_fix_content(v) for v in objects] elif object_type == 'revision': revisions = [_fix_revision(v) for v in objects] return [rev for rev in revisions if rev is not None] + elif object_type == 'origin': + return [_fix_origin(v) for v in objects] elif object_type == 'origin_visit': return [_fix_origin_visit(v) for v in objects] else: return objects diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 8c0bd2b..6157617 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,416 +1,417 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import random from subprocess import Popen from typing import Dict, List, Tuple import dateutil import pytest from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from swh.model.hashutil import hash_to_hex from swh.model.model import Content from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'memory'}, ] } def make_topic(kafka_prefix: str, object_type: str) -> str: return kafka_prefix + '.' + object_type def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'acks': 'all', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, 'swh.journal.replay') # Fill the storage from Kafka replayer = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() if 'Colliding contents:' in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = make_topic(kafka_prefix, 'content') producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, 'swh.journal.replay') # Fill the storage from Kafka replayer = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if 'Collision detected:' in logtext: nb_collisions += 1 actual_collision = record.args['collision'] assert nb_collisions == 1, "1 collision should be detected" algo = 'sha1' assert actual_collision['algo'] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision['hash'] == expected_colliding_hash actual_colliding_hashes = actual_collision['objects'] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values if they are not skipped. """ queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', + 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Origin_visit with no types should make the replayer crash We expect the journal's origin_visit topic to no longer reference such visits. If it does, the replayer must crash so we can fix the journal's topic. """ now = datetime.datetime.now() visit = { 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, } now2 = datetime.datetime.now() visit2 = { 'visit': 2, 'origin': {'url': 'http://example.com/'}, 'date': now2, 'status': 'partial', 'snapshot': None, } for origin_visit in [visit, visit2]: with pytest.raises(ValueError, match='Old origin visit format'): _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack)