Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343806
test_tasks.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
test_tasks.py
View Options
# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
uuid
import
pytest
from
swh.scheduler.model
import
ListedOrigin
,
Lister
from
swh.scheduler.utils
import
create_origin_task_dict
@pytest.fixture
(
autouse
=
True
)
def
celery_worker_and_swh_config
(
swh_scheduler_celery_worker
,
swh_config
):
pass
@pytest.fixture
def
archive_lister
():
return
Lister
(
name
=
"archive-lister"
,
instance_name
=
"example"
,
id
=
uuid
.
uuid4
())
@pytest.fixture
def
archive_listed_origin
(
archive_lister
):
return
ListedOrigin
(
lister_id
=
archive_lister
.
id
,
url
=
"https://example.org/archives"
,
visit_type
=
"tar"
,
extra_loader_arguments
=
{
"artifacts"
:
[],
"snapshot_append"
:
True
,
},
)
def
test_tasks_archive_loader
(
mocker
,
swh_scheduler_celery_app
,
):
mock_load
=
mocker
.
patch
(
"swh.loader.package.archive.loader.ArchiveLoader.load"
)
mock_load
.
return_value
=
{
"status"
:
"eventful"
}
res
=
swh_scheduler_celery_app
.
send_task
(
"swh.loader.package.archive.tasks.LoadArchive"
,
kwargs
=
dict
(
url
=
"https://gnu.org/"
,
artifacts
=
[]),
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
mock_load
.
called
assert
res
.
result
==
{
"status"
:
"eventful"
}
def
test_tasks_archive_loader_snapshot_append
(
mocker
,
swh_scheduler_celery_app
,
):
mock_load
=
mocker
.
patch
(
"swh.loader.package.archive.loader.ArchiveLoader.load"
)
mock_load
.
return_value
=
{
"status"
:
"eventful"
}
res
=
swh_scheduler_celery_app
.
send_task
(
"swh.loader.package.archive.tasks.LoadArchive"
,
kwargs
=
dict
(
url
=
"https://gnu.org/"
,
artifacts
=
[],
snapshot_append
=
True
),
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
mock_load
.
called
assert
res
.
result
==
{
"status"
:
"eventful"
}
def
test_tasks_archive_loader_for_listed_origin
(
mocker
,
swh_scheduler_celery_app
,
archive_lister
,
archive_listed_origin
,
):
mock_load
=
mocker
.
patch
(
"swh.loader.package.archive.loader.ArchiveLoader.load"
)
mock_load
.
return_value
=
{
"status"
:
"eventful"
}
task_dict
=
create_origin_task_dict
(
archive_listed_origin
,
archive_lister
)
res
=
swh_scheduler_celery_app
.
send_task
(
"swh.loader.package.archive.tasks.LoadArchive"
,
kwargs
=
task_dict
[
"arguments"
][
"kwargs"
],
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
mock_load
.
called
assert
res
.
result
==
{
"status"
:
"eventful"
}
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 1:52 PM (4 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295400
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment