Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/archive/tests/test_tasks.py
Show All 13 Lines | res = swh_scheduler_celery_app.send_task( | ||||
"swh.loader.package.archive.tasks.LoadArchive", | "swh.loader.package.archive.tasks.LoadArchive", | ||||
kwargs=dict(url="https://gnu.org/", artifacts=[]), | kwargs=dict(url="https://gnu.org/", artifacts=[]), | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert mock_load.called | assert mock_load.called | ||||
assert res.result == {"status": "eventful"} | assert res.result == {"status": "eventful"} | ||||
def test_tasks_archive_loader_snapshot_append( | |||||
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config | |||||
): | |||||
mock_load = mocker.patch("swh.loader.package.archive.loader.ArchiveLoader.load") | |||||
mock_load.return_value = {"status": "eventful"} | |||||
res = swh_scheduler_celery_app.send_task( | |||||
"swh.loader.package.archive.tasks.LoadArchive", | |||||
kwargs=dict(url="https://gnu.org/", artifacts=[], snapshot_append=True), | |||||
) | |||||
assert res | |||||
res.wait() | |||||
assert res.successful() | |||||
assert mock_load.called | |||||
assert res.result == {"status": "eventful"} |