Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show First 20 Lines • Show All 640 Lines • ▼ Show 20 Lines | def test_get_or_create_lister(self, swh_scheduler): | ||||
assert lister.instance_name == lister_args.get("instance_name", "") | assert lister.instance_name == lister_args.get("instance_name", "") | ||||
lister_get_again = swh_scheduler.get_or_create_lister( | lister_get_again = swh_scheduler.get_or_create_lister( | ||||
lister.name, lister.instance_name | lister.name, lister.instance_name | ||||
) | ) | ||||
assert lister == lister_get_again | assert lister == lister_get_again | ||||
def test_get_lister(self, swh_scheduler): | |||||
for lister_args in LISTERS: | |||||
assert swh_scheduler.get_lister(**lister_args) is None | |||||
db_listers = [] | |||||
for lister_args in LISTERS: | |||||
db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) | |||||
for lister, lister_args in zip(db_listers, LISTERS): | |||||
lister_get_again = swh_scheduler.get_lister( | |||||
lister.name, lister.instance_name | |||||
) | |||||
assert lister == lister_get_again | |||||
def test_update_lister(self, swh_scheduler, stored_lister): | def test_update_lister(self, swh_scheduler, stored_lister): | ||||
lister = attr.evolve(stored_lister, current_state={"updated": "now"}) | lister = attr.evolve(stored_lister, current_state={"updated": "now"}) | ||||
updated_lister = swh_scheduler.update_lister(lister) | updated_lister = swh_scheduler.update_lister(lister) | ||||
assert updated_lister.updated > lister.updated | assert updated_lister.updated > lister.updated | ||||
assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) | assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) | ||||
▲ Show 20 Lines • Show All 489 Lines • Show Last 20 Lines |