Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import random | import random | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Dict, Tuple | from typing import Dict, List, Tuple | ||||
import dateutil | import dateutil | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
from hypothesis import strategies, given, settings | from hypothesis import strategies, given, settings | ||||
import pytest | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import Content | from swh.model.model import Content | ||||
▲ Show 20 Lines • Show All 246 Lines • ▼ Show 20 Lines | def test_storage_play_with_collision( | ||||
for content in DUPLICATE_CONTENTS: | for content in DUPLICATE_CONTENTS: | ||||
expected_content_hashes = { | expected_content_hashes = { | ||||
k: hash_to_hex(v) | k: hash_to_hex(v) | ||||
for k, v in Content.from_dict(content).hashes().items() | for k, v in Content.from_dict(content).hashes().items() | ||||
} | } | ||||
assert expected_content_hashes in actual_colliding_hashes | assert expected_content_hashes in actual_colliding_hashes | ||||
def _test_write_replay_origin_visit(visits): | def _test_write_replay_origin_visit( | ||||
visits: List[Dict], with_skipped_visits: bool = False): | |||||
"""Helper function to write tests for origin_visit. | """Helper function to write tests for origin_visit. | ||||
Each visit (a dict) given in the 'visits' argument will be sent to | Each visit (a dict) given in the 'visits' argument will be sent to | ||||
a (mocked) kafka queue, which a in-memory-storage backed replayer is | a (mocked) kafka queue, which a in-memory-storage backed replayer is | ||||
listening to. | listening to. | ||||
Check that corresponding origin visits entities are present in the storage | Check that corresponding origin visits entities are present in the storage | ||||
and have correct values. | and have correct values if they are not skipped. | ||||
""" | """ | ||||
queue = [] | queue: List = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
writer = MockedKafkaWriter(queue) | writer = MockedKafkaWriter(queue) | ||||
# Note that flipping the order of these two insertions will crash | # Note that flipping the order of these two insertions will crash | ||||
# the test, because the legacy origin_format does not allow to create | # the test, because the legacy origin_format does not allow to create | ||||
# the origin when needed (type is missing) | # the origin when needed (type is missing) | ||||
writer.send('origin', 'foo', { | writer.send('origin', 'foo', { | ||||
'url': 'http://example.com/', | 'url': 'http://example.com/', | ||||
'type': 'git', | 'type': 'git', | ||||
}) | }) | ||||
for visit in visits: | for visit in visits: | ||||
writer.send('origin_visit', 'foo', visit) | writer.send('origin_visit', 'foo', visit) | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert replayer.stop_after_objects is None | assert replayer.stop_after_objects is None | ||||
replayer.stop_after_objects = queue_size | replayer.stop_after_objects = queue_size | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
replayer.process(worker_fn) | replayer.process(worker_fn) | ||||
actual_visits = list(storage.origin_visit_get('http://example.com/')) | actual_visits = list(storage.origin_visit_get('http://example.com/')) | ||||
if with_skipped_visits: | |||||
assert len(actual_visits) == 0, "Old format visits are dropped" | |||||
else: # | |||||
assert len(actual_visits) == len(visits), actual_visits | assert len(actual_visits) == len(visits), actual_visits | ||||
for vin, vout in zip(visits, actual_visits): | for vin, vout in zip(visits, actual_visits): | ||||
vin = vin.copy() | vin = vin.copy() | ||||
vout = vout.copy() | vout = vout.copy() | ||||
assert vout.pop('origin') == 'http://example.com/' | assert vout.pop('origin') == 'http://example.com/' | ||||
vin.pop('origin') | vin.pop('origin') | ||||
vin.setdefault('type', 'git') | vin.setdefault('type', 'git') | ||||
vin.setdefault('metadata', None) | vin.setdefault('metadata', None) | ||||
assert vin == vout | assert vin == vout | ||||
def test_write_replay_origin_visit(): | def test_write_replay_origin_visit(): | ||||
"""Test origin_visit when the 'origin' is just a string.""" | """Test origin_visit when the 'origin' is just a string.""" | ||||
now = datetime.datetime.now() | now = datetime.datetime.now() | ||||
visits = [{ | visits = [{ | ||||
'visit': 1, | 'visit': 1, | ||||
'origin': 'http://example.com/', | 'origin': 'http://example.com/', | ||||
'date': now, | 'date': now, | ||||
'type': 'git', | 'type': 'git', | ||||
'status': 'partial', | 'status': 'partial', | ||||
'snapshot': None, | 'snapshot': None, | ||||
}] | }] | ||||
_test_write_replay_origin_visit(visits) | _test_write_replay_origin_visit(visits) | ||||
def test_write_replay_legacy_origin_visit1(): | def test_write_replay_legacy_origin_visit1(): | ||||
"""Test origin_visit when there is no type.""" | """Origin_visit with no types should be skipped when replayed | ||||
""" | |||||
ardumont: should go away. | |||||
now = datetime.datetime.now() | now = datetime.datetime.now() | ||||
visits = [{ | visit = { | ||||
'visit': 1, | 'visit': 1, | ||||
'origin': 'http://example.com/', | 'origin': 'http://example.com/', | ||||
'date': now, | 'date': now, | ||||
'status': 'partial', | 'status': 'partial', | ||||
'snapshot': None, | 'snapshot': None, | ||||
}] | } | ||||
with pytest.raises(ValueError, match='too old'): | now2 = datetime.datetime.now() | ||||
_test_write_replay_origin_visit(visits) | visit2 = { | ||||
'visit': 2, | |||||
'origin': {'url': 'http://example.com/'}, | |||||
'date': now2, | |||||
'status': 'partial', | |||||
'snapshot': None, | |||||
Not Done Inline ActionsFor consistency, you should renumber test_write_replay_legacy_origin_visit2 and test_write_replay_legacy_origin_visit3, and add a new test_write_replay_legacy_origin_visit2 test for this case vlorentz: For consistency, you should renumber `test_write_replay_legacy_origin_visit2` and… | |||||
Not Done Inline Actionsor maybe renumber differently, depending on the chronology. I don't see where the format you're introducing fits in the timeline vlorentz: or maybe renumber differently, depending on the chronology. I don't see where the format you're… | |||||
} | |||||
Done Inline ActionsI wanted to use the caplog stanza here. So, I'll keep that improvment for later. visits = [visit, visit2] caplog.set_level(logging.ERROR, 'swh.journal.replay') _test_write_replay_origin_visit(visits, with_skipped_visits=True) nb_visits_skipped = 0 skipped_visits = [] for record in caplog.records: logtext = record.getMessage() if 'Old origin visit format detected:' in logtext: nb_visits_skipped += 1 skipped_visits.append(record.args['visit']) assert nb_visits_skipped == len(visits) for visit in visits: assert visit in skipped_visits ardumont: I wanted to use the caplog stanza here.
But cannot really as those tests use mocked replayer... | |||||
_test_write_replay_origin_visit([visit, visit2], with_skipped_visits=True) | |||||
def test_write_replay_legacy_origin_visit2(): | def test_write_replay_legacy_origin_visit2(): | ||||
"""Test origin_visit when 'type' is missing from the visit, but not | """Test origin_visit when 'type' is missing from the visit, but not | ||||
from the origin.""" | from the origin.""" | ||||
now = datetime.datetime.now() | now = datetime.datetime.now() | ||||
visits = [{ | visits = [{ | ||||
'visit': 1, | 'visit': 1, | ||||
Show All 40 Lines |
should go away.