Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_tasks.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
def test_loader(mocker, swh_app, celery_session_worker): | def test_loader(mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker): | ||||
mock_loader = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.load") | mock_loader = mocker.patch("swh.loader.mercurial.loader.HgBundle20Loader.load") | ||||
mock_loader.return_value = {"status": "eventful"} | mock_loader.return_value = {"status": "eventful"} | ||||
res = swh_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.loader.mercurial.tasks.LoadMercurial", | "swh.loader.mercurial.tasks.LoadMercurial", | ||||
kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",}, | kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",}, | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert res.result == {"status": "eventful"} | assert res.result == {"status": "eventful"} | ||||
mock_loader.assert_called_once_with() | mock_loader.assert_called_once_with() | ||||
def test_archive_loader(mocker, swh_app, celery_session_worker): | def test_archive_loader(mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker): | ||||
mock_loader = mocker.patch( | mock_loader = mocker.patch( | ||||
"swh.loader.mercurial.loader.HgArchiveBundle20Loader.load" | "swh.loader.mercurial.loader.HgArchiveBundle20Loader.load" | ||||
) | ) | ||||
mock_loader.return_value = {"status": "uneventful"} | mock_loader.return_value = {"status": "uneventful"} | ||||
res = swh_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.loader.mercurial.tasks.LoadArchiveMercurial", | "swh.loader.mercurial.tasks.LoadArchiveMercurial", | ||||
kwargs={ | kwargs={ | ||||
"url": "another_url", | "url": "another_url", | ||||
"archive_path": "/some/tar.tgz", | "archive_path": "/some/tar.tgz", | ||||
"visit_date": "now", | "visit_date": "now", | ||||
}, | }, | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert res.result == {"status": "uneventful"} | assert res.result == {"status": "uneventful"} | ||||
mock_loader.assert_called_once_with() | mock_loader.assert_called_once_with() |