Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8392874
test_backend.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
test_backend.py
View Options
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
contextlib
import
datetime
from
unittest.mock
import
MagicMock
,
patch
import
attr
import
psycopg2
import
pytest
from
swh.model.model
import
Content
from
swh.model.swhids
import
CoreSWHID
from
swh.vault.exc
import
NotFoundExc
from
swh.vault.tests.vault_testing
import
hash_content
@contextlib.contextmanager
def
mock_cooking
(
vault_backend
):
with
patch
.
object
(
vault_backend
,
"_send_task"
)
as
mt
:
mt
.
return_value
=
42
with
patch
(
"swh.vault.backend.get_cooker_cls"
)
as
mg
:
mcc
=
MagicMock
()
mc
=
MagicMock
()
mg
.
return_value
=
mcc
mcc
.
return_value
=
mc
mc
.
check_exists
.
return_value
=
True
yield
{
"_send_task"
:
mt
,
"get_cooker_cls"
:
mg
,
"cooker_cls"
:
mcc
,
"cooker"
:
mc
,
}
def
assertTimestampAlmostNow
(
ts
,
tolerance_secs
=
1.0
):
# noqa
now
=
datetime
.
datetime
.
now
(
datetime
.
timezone
.
utc
)
creation_delta_secs
=
(
ts
-
now
)
.
total_seconds
()
assert
creation_delta_secs
<
tolerance_secs
def
fake_cook
(
backend
,
bundle_type
,
result_content
,
sticky
=
False
):
swhid
=
Content
.
from_data
(
result_content
)
.
swhid
()
content
,
obj_id
=
hash_content
(
result_content
)
with
mock_cooking
(
backend
):
backend
.
create_task
(
bundle_type
,
swhid
,
sticky
)
backend
.
cache
.
add
(
bundle_type
,
swhid
,
b
"content"
)
backend
.
set_status
(
bundle_type
,
swhid
,
"done"
)
return
swhid
,
content
def
fail_cook
(
backend
,
bundle_type
,
swhid
,
failure_reason
):
with
mock_cooking
(
backend
):
backend
.
create_task
(
bundle_type
,
swhid
)
backend
.
set_status
(
bundle_type
,
swhid
,
"failed"
)
backend
.
set_progress
(
bundle_type
,
swhid
,
failure_reason
)
TEST_TYPE
=
"gitfast"
TEST_SWHID
=
CoreSWHID
.
from_string
(
"swh:1:rev:4a4b9771542143cf070386f86b4b92d42966bdbc"
)
TEST_PROGRESS
=
(
"Mr. White, You're telling me you're cooking again?
\N{ASTONISHED FACE}
"
)
TEST_EMAIL
=
"ouiche@lorraine.fr"
@pytest.fixture
def
swh_vault
(
swh_vault
,
sample_data
):
# make the vault's storage consistent with test data
revision
=
attr
.
evolve
(
sample_data
.
revision
,
id
=
TEST_SWHID
.
object_id
)
swh_vault
.
storage
.
revision_add
([
revision
])
return
swh_vault
def
test_create_task_simple
(
swh_vault
):
with
mock_cooking
(
swh_vault
)
as
m
:
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
m
[
"get_cooker_cls"
]
.
assert_called_once_with
(
TEST_TYPE
,
TEST_SWHID
.
object_type
)
args
=
m
[
"cooker_cls"
]
.
call_args
[
0
]
assert
args
[
0
]
==
TEST_SWHID
assert
m
[
"cooker"
]
.
check_exists
.
call_count
==
1
assert
m
[
"_send_task"
]
.
call_count
==
1
args
=
m
[
"_send_task"
]
.
call_args
[
0
]
assert
args
[
0
]
==
TEST_TYPE
assert
args
[
1
]
==
TEST_SWHID
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"swhid"
]
==
TEST_SWHID
assert
info
[
"type"
]
==
TEST_TYPE
assert
info
[
"task_status"
]
==
"new"
assert
info
[
"task_id"
]
==
42
assertTimestampAlmostNow
(
info
[
"ts_created"
])
assert
info
[
"ts_done"
]
is
None
assert
info
[
"progress_msg"
]
is
None
def
test_create_fail_duplicate_task
(
swh_vault
):
with
mock_cooking
(
swh_vault
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
with
pytest
.
raises
(
psycopg2
.
IntegrityError
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
def
test_create_fail_nonexisting_object
(
swh_vault
):
with
mock_cooking
(
swh_vault
)
as
m
:
m
[
"cooker"
]
.
check_exists
.
side_effect
=
ValueError
(
"Nothing here."
)
with
pytest
.
raises
(
ValueError
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
def
test_create_set_progress
(
swh_vault
):
with
mock_cooking
(
swh_vault
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"progress_msg"
]
is
None
swh_vault
.
set_progress
(
TEST_TYPE
,
TEST_SWHID
,
TEST_PROGRESS
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"progress_msg"
]
==
TEST_PROGRESS
def
test_create_set_status
(
swh_vault
):
with
mock_cooking
(
swh_vault
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"task_status"
]
==
"new"
assert
info
[
"ts_done"
]
is
None
swh_vault
.
set_status
(
TEST_TYPE
,
TEST_SWHID
,
"pending"
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"task_status"
]
==
"pending"
assert
info
[
"ts_done"
]
is
None
swh_vault
.
set_status
(
TEST_TYPE
,
TEST_SWHID
,
"done"
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"task_status"
]
==
"done"
assertTimestampAlmostNow
(
info
[
"ts_done"
])
def
test_create_update_access_ts
(
swh_vault
):
with
mock_cooking
(
swh_vault
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
access_ts_1
=
info
[
"ts_last_access"
]
assertTimestampAlmostNow
(
access_ts_1
)
swh_vault
.
update_access_ts
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
access_ts_2
=
info
[
"ts_last_access"
]
assertTimestampAlmostNow
(
access_ts_2
)
swh_vault
.
update_access_ts
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
access_ts_3
=
info
[
"ts_last_access"
]
assertTimestampAlmostNow
(
access_ts_3
)
assert
access_ts_1
<
access_ts_2
assert
access_ts_2
<
access_ts_3
def
test_cook_idempotent
(
swh_vault
,
sample_data
):
with
mock_cooking
(
swh_vault
):
info1
=
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
)
info2
=
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
)
info3
=
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
)
assert
info1
==
info2
assert
info1
==
info3
def
test_cook_email_pending_done
(
swh_vault
):
with
mock_cooking
(
swh_vault
),
patch
.
object
(
swh_vault
,
"add_notif_email"
)
as
madd
,
patch
.
object
(
swh_vault
,
"send_notification"
)
as
msend
:
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
)
madd
.
assert_not_called
()
msend
.
assert_not_called
()
madd
.
reset_mock
()
msend
.
reset_mock
()
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
,
email
=
TEST_EMAIL
)
madd
.
assert_called_once_with
(
TEST_TYPE
,
TEST_SWHID
,
TEST_EMAIL
)
msend
.
assert_not_called
()
madd
.
reset_mock
()
msend
.
reset_mock
()
swh_vault
.
set_status
(
TEST_TYPE
,
TEST_SWHID
,
"done"
)
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
,
email
=
TEST_EMAIL
)
msend
.
assert_called_once_with
(
None
,
TEST_EMAIL
,
TEST_TYPE
,
TEST_SWHID
,
"done"
)
madd
.
assert_not_called
()
def
test_send_all_emails
(
swh_vault
):
with
mock_cooking
(
swh_vault
):
emails
=
(
"a@example.com"
,
"billg@example.com"
,
"test+42@example.org"
)
for
email
in
emails
:
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
,
email
=
email
)
swh_vault
.
set_status
(
TEST_TYPE
,
TEST_SWHID
,
"done"
)
with
patch
.
object
(
swh_vault
,
"smtp_server"
)
as
m
:
swh_vault
.
send_notif
(
TEST_TYPE
,
TEST_SWHID
)
sent_emails
=
{
k
[
0
][
0
]
for
k
in
m
.
send_message
.
call_args_list
}
assert
{
k
[
"To"
]
for
k
in
sent_emails
}
==
set
(
emails
)
for
e
in
sent_emails
:
assert
"bot@softwareheritage.org"
in
e
[
"From"
]
assert
TEST_TYPE
in
e
[
"Subject"
]
assert
TEST_SWHID
.
object_id
.
hex
()[:
5
]
in
e
[
"Subject"
]
assert
TEST_TYPE
in
str
(
e
)
assert
"https://archive.softwareheritage.org/"
in
str
(
e
)
assert
TEST_SWHID
.
object_id
.
hex
()[:
5
]
in
str
(
e
)
assert
"--
\x20\n
"
in
str
(
e
)
# Well-formated signature!!!
# Check that the entries have been deleted and recalling the
# function does not re-send the e-mails
m
.
reset_mock
()
swh_vault
.
send_notif
(
TEST_TYPE
,
TEST_SWHID
)
m
.
assert_not_called
()
def
test_available
(
swh_vault
):
assert
not
swh_vault
.
is_available
(
TEST_TYPE
,
TEST_SWHID
)
with
mock_cooking
(
swh_vault
):
swh_vault
.
create_task
(
TEST_TYPE
,
TEST_SWHID
)
assert
not
swh_vault
.
is_available
(
TEST_TYPE
,
TEST_SWHID
)
swh_vault
.
cache
.
add
(
TEST_TYPE
,
TEST_SWHID
,
b
"content"
)
assert
not
swh_vault
.
is_available
(
TEST_TYPE
,
TEST_SWHID
)
swh_vault
.
set_status
(
TEST_TYPE
,
TEST_SWHID
,
"done"
)
assert
swh_vault
.
is_available
(
TEST_TYPE
,
TEST_SWHID
)
def
test_fetch
(
swh_vault
):
assert
swh_vault
.
fetch
(
TEST_TYPE
,
TEST_SWHID
,
raise_notfound
=
False
)
is
None
with
pytest
.
raises
(
NotFoundExc
,
match
=
f
"{TEST_TYPE} {TEST_SWHID} is not available."
):
swh_vault
.
fetch
(
TEST_TYPE
,
TEST_SWHID
)
swhid
,
content
=
fake_cook
(
swh_vault
,
TEST_TYPE
,
b
"content"
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
swhid
)
access_ts_before
=
info
[
"ts_last_access"
]
assert
swh_vault
.
fetch
(
TEST_TYPE
,
swhid
)
==
b
"content"
info
=
swh_vault
.
progress
(
TEST_TYPE
,
swhid
)
access_ts_after
=
info
[
"ts_last_access"
]
assertTimestampAlmostNow
(
access_ts_after
)
assert
access_ts_before
<
access_ts_after
def
test_cache_expire_oldest
(
swh_vault
):
r
=
range
(
1
,
10
)
inserted
=
{}
for
i
in
r
:
sticky
=
i
==
5
content
=
b
"content
%s
"
%
str
(
i
)
.
encode
()
swhid
,
content
=
fake_cook
(
swh_vault
,
TEST_TYPE
,
content
,
sticky
)
inserted
[
i
]
=
(
swhid
,
content
)
swh_vault
.
update_access_ts
(
TEST_TYPE
,
inserted
[
2
][
0
])
swh_vault
.
update_access_ts
(
TEST_TYPE
,
inserted
[
3
][
0
])
swh_vault
.
cache_expire_oldest
(
n
=
4
)
should_be_still_here
=
{
2
,
3
,
5
,
8
,
9
}
for
i
in
r
:
assert
swh_vault
.
is_available
(
TEST_TYPE
,
inserted
[
i
][
0
])
==
(
i
in
should_be_still_here
)
def
test_cache_expire_until
(
swh_vault
):
r
=
range
(
1
,
10
)
inserted
=
{}
for
i
in
r
:
sticky
=
i
==
5
content
=
b
"content
%s
"
%
str
(
i
)
.
encode
()
swhid
,
content
=
fake_cook
(
swh_vault
,
TEST_TYPE
,
content
,
sticky
)
inserted
[
i
]
=
(
swhid
,
content
)
if
i
==
7
:
cutoff_date
=
datetime
.
datetime
.
now
()
swh_vault
.
update_access_ts
(
TEST_TYPE
,
inserted
[
2
][
0
])
swh_vault
.
update_access_ts
(
TEST_TYPE
,
inserted
[
3
][
0
])
swh_vault
.
cache_expire_until
(
date
=
cutoff_date
)
should_be_still_here
=
{
2
,
3
,
5
,
8
,
9
}
for
i
in
r
:
assert
swh_vault
.
is_available
(
TEST_TYPE
,
inserted
[
i
][
0
])
==
(
i
in
should_be_still_here
)
def
test_fail_cook_simple
(
swh_vault
):
fail_cook
(
swh_vault
,
TEST_TYPE
,
TEST_SWHID
,
"error42"
)
assert
not
swh_vault
.
is_available
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"progress_msg"
]
==
"error42"
def
test_send_failure_email
(
swh_vault
):
with
mock_cooking
(
swh_vault
):
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
,
email
=
"a@example.com"
)
swh_vault
.
set_status
(
TEST_TYPE
,
TEST_SWHID
,
"failed"
)
swh_vault
.
set_progress
(
TEST_TYPE
,
TEST_SWHID
,
"test error"
)
with
patch
.
object
(
swh_vault
,
"smtp_server"
)
as
m
:
swh_vault
.
send_notif
(
TEST_TYPE
,
TEST_SWHID
)
e
=
[
k
[
0
][
0
]
for
k
in
m
.
send_message
.
call_args_list
][
0
]
assert
e
[
"To"
]
==
"a@example.com"
assert
"bot@softwareheritage.org"
in
e
[
"From"
]
assert
TEST_TYPE
in
e
[
"Subject"
]
assert
TEST_SWHID
.
object_id
.
hex
()[:
5
]
in
e
[
"Subject"
]
assert
"fail"
in
e
[
"Subject"
]
assert
TEST_TYPE
in
str
(
e
)
assert
TEST_SWHID
.
object_id
.
hex
()[:
5
]
in
str
(
e
)
assert
"test error"
in
str
(
e
)
assert
"--
\x20\n
"
in
str
(
e
)
# Well-formated signature
def
test_retry_failed_bundle
(
swh_vault
):
fail_cook
(
swh_vault
,
TEST_TYPE
,
TEST_SWHID
,
"error42"
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"task_status"
]
==
"failed"
with
mock_cooking
(
swh_vault
):
swh_vault
.
cook
(
TEST_TYPE
,
TEST_SWHID
)
info
=
swh_vault
.
progress
(
TEST_TYPE
,
TEST_SWHID
)
assert
info
[
"task_status"
]
==
"new"
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 7:04 PM (10 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295425
Attached To
rDVAU Software Heritage Vault
Event Timeline
Log In to Comment