Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8395932
test_utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
test_utils.py
View Options
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
email
from
email.headerregistry
import
Address
from
email.message
import
EmailMessage
import
email.policy
from
importlib.resources
import
open_binary
from
typing
import
List
import
pytest
from
swh.web.inbound_email
import
utils
def
test_extract_recipients
():
message
=
EmailMessage
()
assert
utils
.
extract_recipients
(
message
)
==
[]
message
[
"To"
]
=
"Test Recipient <test-recipient@example.com>"
assert
utils
.
extract_recipients
(
message
)
==
[
Address
(
display_name
=
"Test Recipient"
,
addr_spec
=
"test-recipient@example.com"
)
]
message
[
"Cc"
]
=
(
"test-recipient-2@example.com, "
"Another Test Recipient <test-recipient-3@example.com>"
)
assert
utils
.
extract_recipients
(
message
)
==
[
Address
(
display_name
=
"Test Recipient"
,
addr_spec
=
"test-recipient@example.com"
),
Address
(
addr_spec
=
"test-recipient-2@example.com"
),
Address
(
display_name
=
"Another Test Recipient"
,
addr_spec
=
"test-recipient-3@example.com"
,
),
]
del
message
[
"To"
]
assert
utils
.
extract_recipients
(
message
)
==
[
Address
(
addr_spec
=
"test-recipient-2@example.com"
),
Address
(
display_name
=
"Another Test Recipient"
,
addr_spec
=
"test-recipient-3@example.com"
,
),
]
def
test_single_recipient_matches
():
assert
(
utils
.
single_recipient_matches
(
Address
(
addr_spec
=
"test@example.com"
),
"match@example.com"
)
is
None
)
assert
utils
.
single_recipient_matches
(
Address
(
addr_spec
=
"match@example.com"
),
"match@example.com"
)
==
utils
.
AddressMatch
(
recipient
=
Address
(
addr_spec
=
"match@example.com"
),
extension
=
None
)
assert
utils
.
single_recipient_matches
(
Address
(
addr_spec
=
"MaTch+12345AbC@exaMple.Com"
),
"match@example.com"
)
==
utils
.
AddressMatch
(
recipient
=
Address
(
addr_spec
=
"MaTch+12345AbC@exaMple.Com"
),
extension
=
"12345AbC"
)
def
test_recipient_matches
():
message
=
EmailMessage
()
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
==
[]
message
=
EmailMessage
()
message
[
"to"
]
=
"nomatch@example.com"
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
==
[]
message
=
EmailMessage
()
message
[
"to"
]
=
"match@example.com"
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
==
[
utils
.
AddressMatch
(
recipient
=
Address
(
addr_spec
=
"match@example.com"
),
extension
=
None
)
]
message
=
EmailMessage
()
message
[
"to"
]
=
"match+extension@example.com"
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
==
[
utils
.
AddressMatch
(
recipient
=
Address
(
addr_spec
=
"match+extension@example.com"
),
extension
=
"extension"
,
)
]
message
=
EmailMessage
()
message
[
"to"
]
=
"match+weird+plussed+extension@example.com"
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
==
[
utils
.
AddressMatch
(
recipient
=
Address
(
addr_spec
=
"match+weird+plussed+extension@example.com"
),
extension
=
"weird+plussed+extension"
,
)
]
message
=
EmailMessage
()
message
[
"to"
]
=
"nomatch@example.com"
message
[
"cc"
]
=
", "
.
join
(
(
"match@example.com"
,
"match@notamatch.example.com"
,
"Another Match <match+extension@example.com>"
,
)
)
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
==
[
utils
.
AddressMatch
(
recipient
=
Address
(
addr_spec
=
"match@example.com"
),
extension
=
None
,
),
utils
.
AddressMatch
(
recipient
=
Address
(
display_name
=
"Another Match"
,
addr_spec
=
"match+extension@example.com"
),
extension
=
"extension"
,
),
]
def
test_recipient_matches_casemapping
():
message
=
EmailMessage
()
message
[
"to"
]
=
"match@example.com"
assert
utils
.
recipient_matches
(
message
,
"Match@Example.Com"
)
assert
utils
.
recipient_matches
(
message
,
"match@example.com"
)
message
=
EmailMessage
()
message
[
"to"
]
=
"Match+weirdCaseMapping@Example.Com"
matches
=
utils
.
recipient_matches
(
message
,
"match@example.com"
)
assert
matches
assert
matches
[
0
]
.
extension
==
"weirdCaseMapping"
def
test_get_address_for_pk
():
salt
=
"test_salt"
pks
=
[
1
,
10
,
1000
]
base_address
=
"base@example.com"
addresses
=
{
pk
:
utils
.
get_address_for_pk
(
salt
=
salt
,
base_address
=
base_address
,
pk
=
pk
)
for
pk
in
pks
}
assert
len
(
set
(
addresses
.
values
()))
==
len
(
addresses
)
for
pk
,
address
in
addresses
.
items
():
localpart
,
_
,
domain
=
address
.
partition
(
"@"
)
base_localpart
,
_
,
extension
=
localpart
.
partition
(
"+"
)
assert
domain
==
"example.com"
assert
base_localpart
==
"base"
assert
extension
.
startswith
(
f
"{pk}."
)
def
test_get_address_for_pk_salt
():
pk
=
1000
base_address
=
"base@example.com"
addresses
=
[
utils
.
get_address_for_pk
(
salt
=
salt
,
base_address
=
base_address
,
pk
=
pk
)
for
salt
in
[
"salt1"
,
"salt2"
]
]
assert
len
(
addresses
)
==
len
(
set
(
addresses
))
def
test_get_pks_from_message
():
salt
=
"test_salt"
pks
=
[
1
,
10
,
1000
]
base_address
=
"base@example.com"
addresses
=
{
pk
:
utils
.
get_address_for_pk
(
salt
=
salt
,
base_address
=
base_address
,
pk
=
pk
)
for
pk
in
pks
}
message
=
EmailMessage
()
message
[
"To"
]
=
"test@example.com"
assert
utils
.
get_pks_from_message
(
salt
,
base_address
,
message
)
==
set
()
message
=
EmailMessage
()
message
[
"To"
]
=
f
"Test Address <{addresses[1]}>"
assert
utils
.
get_pks_from_message
(
salt
,
base_address
,
message
)
==
{
1
}
message
=
EmailMessage
()
message
[
"To"
]
=
f
"Test Address <{addresses[1]}>"
message
[
"Cc"
]
=
", "
.
join
(
[
f
"Test Address <{addresses[1]}>"
,
f
"Another Test Address <{addresses[10].lower()}>"
,
"A Third Address <irrelevant@example.com>"
,
]
)
assert
utils
.
get_pks_from_message
(
salt
,
base_address
,
message
)
==
{
1
,
10
}
def
test_get_pks_from_message_logging
(
caplog
):
salt
=
"test_salt"
pks
=
[
1
,
10
,
1000
]
base_address
=
"base@example.com"
addresses
=
{
pk
:
utils
.
get_address_for_pk
(
salt
=
salt
,
base_address
=
base_address
,
pk
=
pk
)
for
pk
in
pks
}
message
=
EmailMessage
()
message
[
"To"
]
=
f
"Test Address <{base_address}>"
assert
utils
.
get_pks_from_message
(
salt
,
base_address
,
message
)
==
set
()
relevant_records
=
[
record
for
record
in
caplog
.
records
if
record
.
name
==
"swh.web.inbound_email.utils"
]
assert
len
(
relevant_records
)
==
1
assert
relevant_records
[
0
]
.
levelname
==
"DEBUG"
assert
(
f
"{base_address} cannot be matched to a request"
in
relevant_records
[
0
]
.
getMessage
()
)
# Replace the signature with "mangle{signature}"
mangled_address
=
addresses
[
1
]
.
replace
(
"."
,
".mangle"
,
1
)
message
=
EmailMessage
()
message
[
"To"
]
=
f
"Test Address <{mangled_address}>"
assert
utils
.
get_pks_from_message
(
salt
,
base_address
,
message
)
==
set
()
relevant_records
=
[
record
for
record
in
caplog
.
records
if
record
.
name
==
"swh.web.inbound_email.utils"
]
assert
len
(
relevant_records
)
==
2
assert
relevant_records
[
0
]
.
levelname
==
"DEBUG"
assert
relevant_records
[
1
]
.
levelname
==
"DEBUG"
assert
f
"{mangled_address} failed"
in
relevant_records
[
1
]
.
getMessage
()
@pytest.mark.parametrize
(
"filename,expected_parts,expected_absent"
,
(
pytest
.
param
(
"plaintext.eml"
,
[
b
"Plain text email.
\n\n
--
\n
Test User"
],
[],
id
=
"plaintext"
,
),
pytest
.
param
(
"multipart_alternative.eml"
,
[
b
"*Multipart email.*
\n\n
--
\n
Test User"
],
[],
id
=
"multipart_alternative"
,
),
pytest
.
param
(
"multipart_alternative_html_only.eml"
,
[
b
"<html>"
,
b
"<b>Multipart email (a much longer html part).</b>"
],
[
b
"<b>Multipart email (short html part)</b>"
],
id
=
"multipart_alternative_html_only"
,
),
pytest
.
param
(
"multipart_alternative_text_only.eml"
,
[
b
"*Multipart email, but a longer text part.*
\n\n
--
\n
Test User"
],
[],
id
=
"multipart_alternative_text_only"
,
),
pytest
.
param
(
"multipart_mixed.eml"
,
[
b
"This is plain text"
,
b
"and <b>this is HTML</b>"
],
[
b
"This is a multi-part message in MIME format."
],
id
=
"multipart_mixed"
,
),
pytest
.
param
(
"multipart_mixed2.eml"
,
[
b
"This is plain text"
,
b
"and this is more text"
],
[
b
"This is a multi-part message in MIME format."
],
id
=
"multipart_mixed2"
,
),
pytest
.
param
(
"multipart_mixed_text_only.eml"
,
[
b
"My test email"
],
[
b
"HTML attachment"
,
b
"text attachment"
,
b
"This is a multi-part message in MIME format."
,
],
id
=
"multipart_mixed_text_only"
,
),
pytest
.
param
(
"multipart_alternative_recursive.eml"
,
[
b
"This is plain text"
,
b
"and more plain text"
],
[
b
"this is HTML"
,
b
"This is a multi-part message in MIME format."
],
id
=
"multipart_alternative_recursive"
,
),
pytest
.
param
(
"multipart_related.eml"
,
[
b
"See the message below
\n\n
---------- Forwarded message ---------"
,
b
"Hello everyone,
\n\n
See my attachment"
,
],
[
b
"this is HTML"
,
b
"This is a multi-part message in MIME format."
],
id
=
"multipart_alternative_recursive"
,
),
),
)
def
test_get_message_plaintext
(
filename
:
str
,
expected_parts
:
List
[
bytes
],
expected_absent
:
List
[
bytes
]
):
with
open_binary
(
"swh.web.tests.inbound_email.resources"
,
filename
)
as
f
:
message
=
email
.
message_from_binary_file
(
f
,
policy
=
email
.
policy
.
default
)
assert
isinstance
(
message
,
EmailMessage
)
plaintext
=
utils
.
get_message_plaintext
(
message
)
assert
plaintext
is
not
None
if
len
(
expected_parts
)
==
1
:
assert
plaintext
==
expected_parts
[
0
]
else
:
for
part
in
expected_parts
:
assert
part
in
plaintext
for
part
in
expected_absent
:
assert
part
not
in
plaintext
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 7:47 PM (12 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3323974
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment