Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
Show First 20 Lines • Show All 102 Lines • ▼ Show 20 Lines | for origin in origins: | ||||
assert expected_visits == actual_visits | assert expected_visits == actual_visits | ||||
contents = list(storage.content_get_metadata( | contents = list(storage.content_get_metadata( | ||||
[cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) | [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) | ||||
assert None not in contents | assert None not in contents | ||||
assert contents == OBJECT_TYPE_KEYS['content'][1] | assert contents == OBJECT_TYPE_KEYS['content'][1] | ||||
def test_write_replay_legacy_origin_visit1(): | def _test_write_replay_origin_visit(visits): | ||||
vlorentz: The docstring should explain what it does | |||||
"""Test origin_visit when the 'origin' is just a string.""" | """Helper function to write tests for origin_visit. | ||||
Each visit (a dict) given in the 'visits' argument will be sent to | |||||
a (mocked) kafka queue, which a in-memory-storage backed replayer is | |||||
listening to. | |||||
Check that corresponding origin visits entities are present in the storage | |||||
and have correct values. | |||||
""" | |||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
writer = MockedKafkaWriter(queue) | writer = MockedKafkaWriter(queue) | ||||
# Note that flipping the order of these two insertions will crash | # Note that flipping the order of these two insertions will crash | ||||
# the test, because the legacy origin_format does not allow to create | # the test, because the legacy origin_format does not allow to create | ||||
# the origin when needed (type is missing) | # the origin when needed (type is missing) | ||||
now = datetime.datetime.now() | |||||
writer.send('origin', 'foo', { | writer.send('origin', 'foo', { | ||||
'url': 'http://example.com/', | 'url': 'http://example.com/', | ||||
'type': 'git', | 'type': 'git', | ||||
}) | }) | ||||
writer.send('origin_visit', 'foo', { | for visit in visits: | ||||
'visit': 1, | writer.send('origin_visit', 'foo', visit) | ||||
'origin': 'http://example.com/', | |||||
'date': now, | |||||
}) | |||||
queue_size = sum(len(partition) | queue_size = sum(len(partition) | ||||
for batch in queue | for batch in queue | ||||
for partition in batch.values()) | for partition in batch.values()) | ||||
storage = get_storage('memory', {}) | storage = get_storage('memory', {}) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while nb_messages < queue_size: | while nb_messages < queue_size: | ||||
nb_messages += replayer.process(worker_fn) | nb_messages += replayer.process(worker_fn) | ||||
visits = list(storage.origin_visit_get('http://example.com/')) | actual_visits = list(storage.origin_visit_get('http://example.com/')) | ||||
Not Done Inline Actionsassert len(visitsout) == len(visits), visitsout vlorentz: `assert len(visitsout) == len(visits), visitsout` | |||||
assert len(actual_visits) == len(visits), actual_visits | |||||
Not Done Inline Actionsrename visits to expected_visits and visitsout to visits or actual_visits, for consistency with other tests. vlorentz: rename `visits` to `expected_visits` and `visitsout` to `visits` or `actual_visits`, for… | |||||
for vin, vout in zip(visits, actual_visits): | |||||
vin = vin.copy() | |||||
vout = vout.copy() | |||||
if ENABLE_ORIGIN_IDS: | if ENABLE_ORIGIN_IDS: | ||||
assert visits == [{ | assert vout.pop('origin') == 1 | ||||
'visit': 1, | |||||
'origin': 1, | |||||
'date': now, | |||||
}] | |||||
else: | else: | ||||
assert visits == [{ | assert vout.pop('origin') == 'http://example.com/' | ||||
Not Done Inline Actionsredundant vlorentz: redundant | |||||
Done Inline Actionsnope, vin vs vout douardda: nope, `vin` vs `vout` | |||||
Not Done Inline ActionsIndeed. But why did you remove it, then? vlorentz: Indeed. But why did you remove it, then? | |||||
Done Inline Actionshumm not sure any more. let me dig this a bit further douardda: humm not sure any more. let me dig this a bit further | |||||
Done Inline Actionsoh yes I do remember, because I do not want to handle/check all the different possible ways this 'origin' has been send into kafka (str vs. int vs dict). I believe... douardda: oh yes I do remember, because I do not want to handle/check all the different possible ways… | |||||
vin.pop('origin') | |||||
vin.setdefault('type', 'git') | |||||
assert vin == vout | |||||
def test_write_replay_legacy_origin_visit1(): | |||||
"""Test origin_visit when the 'origin' is just a string.""" | |||||
now = datetime.datetime.now() | |||||
visits = [{ | |||||
'visit': 1, | 'visit': 1, | ||||
'origin': 'http://example.com/', | 'origin': 'http://example.com/', | ||||
'date': now, | 'date': now, | ||||
'type': 'hg' | |||||
}] | }] | ||||
_test_write_replay_origin_visit(visits) | |||||
def test_write_replay_legacy_origin_visit2(): | def test_write_replay_legacy_origin_visit2(): | ||||
"""Test origin_visit when 'type' is missing.""" | """Test origin_visit when 'type' is missing.""" | ||||
queue = [] | |||||
replayer = MockedJournalClient(queue) | |||||
writer = MockedKafkaWriter(queue) | |||||
now = datetime.datetime.now() | now = datetime.datetime.now() | ||||
writer.send('origin', 'foo', { | visits = [{ | ||||
'url': 'http://example.com/', | |||||
'type': 'git', | |||||
}) | |||||
writer.send('origin_visit', 'foo', { | |||||
'visit': 1, | 'visit': 1, | ||||
'origin': { | 'origin': { | ||||
'url': 'http://example.com/', | 'url': 'http://example.com/', | ||||
'type': 'git', | 'type': 'git', | ||||
}, | }, | ||||
'date': now, | 'date': now, | ||||
}) | |||||
queue_size = sum(len(partition) | |||||
for batch in queue | |||||
for partition in batch.values()) | |||||
storage = get_storage('memory', {}) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | |||||
nb_messages = 0 | |||||
while nb_messages < queue_size: | |||||
nb_messages += replayer.process(worker_fn) | |||||
visits = list(storage.origin_visit_get('http://example.com/')) | |||||
if ENABLE_ORIGIN_IDS: | |||||
assert visits == [{ | |||||
'visit': 1, | |||||
'origin': 1, | |||||
'date': now, | |||||
'type': 'git', | |||||
}] | |||||
else: | |||||
assert visits == [{ | |||||
'visit': 1, | |||||
'origin': 'http://example.com/', | |||||
'date': now, | |||||
'type': 'git', | |||||
}] | }] | ||||
_test_write_replay_origin_visit(visits) | |||||
hash_strategy = strategies.binary(min_size=20, max_size=20) | hash_strategy = strategies.binary(min_size=20, max_size=20) | ||||
@settings(max_examples=500) | @settings(max_examples=500) | ||||
@given(strategies.sets(hash_strategy, min_size=0, max_size=500), | @given(strategies.sets(hash_strategy, min_size=0, max_size=500), | ||||
strategies.sets(hash_strategy, min_size=10)) | strategies.sets(hash_strategy, min_size=10)) | ||||
def test_is_hash_in_bytearray(haystack, needles): | def test_is_hash_in_bytearray(haystack, needles): | ||||
array = b''.join(sorted(haystack)) | array = b''.join(sorted(haystack)) | ||||
needles |= haystack # Exhaustively test for all objects in the array | needles |= haystack # Exhaustively test for all objects in the array | ||||
for needle in needles: | for needle in needles: | ||||
assert is_hash_in_bytearray(needle, array, len(haystack)) == \ | assert is_hash_in_bytearray(needle, array, len(haystack)) == \ | ||||
(needle in haystack) | (needle in haystack) |
The docstring should explain what it does