Page MenuHomeSoftware Heritage

test_tasks.py
No OneTemporary

test_tasks.py

# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import uuid
import pytest
from swh.scheduler.model import ListedOrigin, Lister
from swh.scheduler.utils import create_origin_task_dict
@pytest.fixture(autouse=True)
def celery_worker_and_swh_config(swh_scheduler_celery_worker, swh_config):
pass
@pytest.fixture
def deposit_lister():
return Lister(name="deposit-lister", instance_name="example", id=uuid.uuid4())
@pytest.fixture
def deposit_listed_origin(deposit_lister):
return ListedOrigin(
lister_id=deposit_lister.id,
url="https://example.org/project",
visit_type="deposit",
extra_loader_arguments={"deposit_id": "some-d-id"},
)
def test_tasks_deposit_loader(
mocker,
swh_scheduler_celery_app,
):
mock_loader = mocker.patch(
"swh.loader.package.deposit.loader.DepositLoader.from_configfile"
)
mock_loader.return_value = mock_loader
mock_loader.load.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.loader.package.deposit.tasks.LoadDeposit",
kwargs=dict(
url="some-url",
deposit_id="some-d-id",
),
)
assert res
res.wait()
assert res.successful()
assert mock_loader.called
assert res.result == {"status": "eventful"}
def test_tasks_deposit_loader_for_listed_origin(
mocker,
swh_scheduler_celery_app,
deposit_lister,
deposit_listed_origin,
):
mock_loader = mocker.patch(
"swh.loader.package.deposit.loader.DepositLoader.from_configfile"
)
mock_loader.return_value = mock_loader
mock_loader.load.return_value = {"status": "eventful"}
task_dict = create_origin_task_dict(deposit_listed_origin, deposit_lister)
res = swh_scheduler_celery_app.send_task(
"swh.loader.package.deposit.tasks.LoadDeposit",
kwargs=task_dict["arguments"]["kwargs"],
)
assert res
res.wait()
assert res.successful()
assert mock_loader.called
assert res.result == {"status": "eventful"}

File Metadata

Mime Type
text/x-python
Expires
Thu, Jul 3, 11:41 AM (4 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3298016

Event Timeline