Page MenuHomeSoftware Heritage

test_tasks.py
No OneTemporary

test_tasks.py

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
def test_task_origin_metadata(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch("swh.indexer.tasks.OriginMetadataIndexer.run")
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.indexer.tasks.OriginMetadata", args=["origin-url"],
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
def test_task_ctags(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch("swh.indexer.tasks.CtagsIndexer.run")
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task("swh.indexer.tasks.Ctags", args=["id0"],)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
def test_task_fossology_license(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch("swh.indexer.tasks.FossologyLicenseIndexer.run")
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.indexer.tasks.ContentFossologyLicense", args=["id0"],
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
def test_task_recompute_checksums(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch("swh.indexer.tasks.RecomputeChecksums.run")
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.indexer.tasks.RecomputeChecksums", args=[[{"blake2b256": "id"}]],
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
def test_task_mimetype(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch("swh.indexer.tasks.MimetypeIndexer.run")
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.indexer.tasks.ContentMimetype", args=["id0"],
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
def test_task_mimetype_partition(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch("swh.indexer.tasks.MimetypePartitionIndexer.run")
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.indexer.tasks.ContentMimetypePartition", args=[0, 4],
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
def test_task_license_partition(
mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config
):
mock_indexer = mocker.patch(
"swh.indexer.tasks.FossologyLicensePartitionIndexer.run"
)
mock_indexer.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.indexer.tasks.ContentFossologyLicensePartition", args=[0, 4],
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}

File Metadata

Mime Type
text/plain
Expires
Wed, Jun 4, 7:23 PM (5 d, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3296146

Event Timeline