Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
Show All 25 Lines | |||||
from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter | from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter | ||||
from swh.journal.tests.conftest import ( | from swh.journal.tests.conftest import ( | ||||
TEST_OBJECT_DICTS, | TEST_OBJECT_DICTS, | ||||
DUPLICATE_CONTENTS, | DUPLICATE_CONTENTS, | ||||
) | ) | ||||
UTC = datetime.timezone.utc | |||||
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} | storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} | ||||
def test_storage_play( | def test_storage_play( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, | kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, | ||||
): | ): | ||||
"""Optimal replayer scenario. | """Optimal replayer scenario. | ||||
Show All 9 Lines | ): | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=UTC) | ||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for object_type, objects in TEST_OBJECT_DICTS.items(): | for object_type, objects in TEST_OBJECT_DICTS.items(): | ||||
topic = f"{kafka_prefix}.{object_type}" | topic = f"{kafka_prefix}.{object_type}" | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | ): | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test producer", | "client.id": "test producer", | ||||
"enable.idempotence": "true", | "enable.idempotence": "true", | ||||
} | } | ||||
) | ) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=UTC) | ||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for object_type, objects in TEST_OBJECT_DICTS.items(): | for object_type, objects in TEST_OBJECT_DICTS.items(): | ||||
topic = f"{kafka_prefix}.{object_type}" | topic = f"{kafka_prefix}.{object_type}" | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
▲ Show 20 Lines • Show All 139 Lines • ▼ Show 20 Lines | for vin, vout in zip(visits, actual_visits): | ||||
vin.pop("origin") | vin.pop("origin") | ||||
vin.setdefault("type", "git") | vin.setdefault("type", "git") | ||||
vin.setdefault("metadata", None) | vin.setdefault("metadata", None) | ||||
assert vin == vout | assert vin == vout | ||||
def test_write_replay_origin_visit(): | def test_write_replay_origin_visit(): | ||||
"""Test origin_visit when the 'origin' is just a string.""" | """Test origin_visit when the 'origin' is just a string.""" | ||||
now = datetime.datetime.now() | now = datetime.datetime.now(tz=UTC) | ||||
visits = [ | visits = [ | ||||
{ | { | ||||
"visit": 1, | "visit": 1, | ||||
"origin": "http://example.com/", | "origin": "http://example.com/", | ||||
"date": now, | "date": now, | ||||
"type": "git", | "type": "git", | ||||
"status": "partial", | "status": "partial", | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
] | ] | ||||
_test_write_replay_origin_visit(visits) | _test_write_replay_origin_visit(visits) | ||||
def test_write_replay_legacy_origin_visit1(): | def test_write_replay_legacy_origin_visit1(): | ||||
"""Origin_visit with no types should make the replayer crash | """Origin_visit with no types should make the replayer crash | ||||
We expect the journal's origin_visit topic to no longer reference such | We expect the journal's origin_visit topic to no longer reference such | ||||
visits. If it does, the replayer must crash so we can fix the journal's | visits. If it does, the replayer must crash so we can fix the journal's | ||||
topic. | topic. | ||||
""" | """ | ||||
now = datetime.datetime.now() | now = datetime.datetime.now(tz=UTC) | ||||
visit = { | visit = { | ||||
"visit": 1, | "visit": 1, | ||||
"origin": "http://example.com/", | "origin": "http://example.com/", | ||||
"date": now, | "date": now, | ||||
"status": "partial", | "status": "partial", | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
now2 = datetime.datetime.now() | now2 = datetime.datetime.now(tz=UTC) | ||||
visit2 = { | visit2 = { | ||||
"visit": 2, | "visit": 2, | ||||
"origin": {"url": "http://example.com/"}, | "origin": {"url": "http://example.com/"}, | ||||
"date": now2, | "date": now2, | ||||
"status": "partial", | "status": "partial", | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
for origin_visit in [visit, visit2]: | for origin_visit in [visit, visit2]: | ||||
with pytest.raises(ValueError, match="Old origin visit format"): | with pytest.raises(ValueError, match="Old origin visit format"): | ||||
_test_write_replay_origin_visit([origin_visit]) | _test_write_replay_origin_visit([origin_visit]) | ||||
def test_write_replay_legacy_origin_visit2(): | def test_write_replay_legacy_origin_visit2(): | ||||
"""Test origin_visit when 'type' is missing from the visit, but not | """Test origin_visit when 'type' is missing from the visit, but not | ||||
from the origin.""" | from the origin.""" | ||||
now = datetime.datetime.now() | now = datetime.datetime.now(tz=UTC) | ||||
visits = [ | visits = [ | ||||
{ | { | ||||
"visit": 1, | "visit": 1, | ||||
"origin": {"url": "http://example.com/", "type": "git",}, | "origin": {"url": "http://example.com/", "type": "git",}, | ||||
"date": now, | "date": now, | ||||
"type": "git", | "type": "git", | ||||
"status": "partial", | "status": "partial", | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
] | ] | ||||
_test_write_replay_origin_visit(visits) | _test_write_replay_origin_visit(visits) | ||||
def test_write_replay_legacy_origin_visit3(): | def test_write_replay_legacy_origin_visit3(): | ||||
"""Test origin_visit when the origin is a dict""" | """Test origin_visit when the origin is a dict""" | ||||
now = datetime.datetime.now() | now = datetime.datetime.now(tz=UTC) | ||||
visits = [ | visits = [ | ||||
{ | { | ||||
"visit": 1, | "visit": 1, | ||||
"origin": {"url": "http://example.com/",}, | "origin": {"url": "http://example.com/",}, | ||||
"date": now, | "date": now, | ||||
"type": "git", | "type": "git", | ||||
"status": "partial", | "status": "partial", | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
] | ] | ||||
_test_write_replay_origin_visit(visits) | _test_write_replay_origin_visit(visits) |