Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9344718
conftest.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
38 KB
Subscribers
None
conftest.py
View Options
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
collections
import
defaultdict
from
datetime
import
timedelta
import
functools
import
json
import
os
import
random
import
shutil
from
subprocess
import
PIPE
,
run
import
sys
import
time
from
typing
import
Any
,
Dict
,
List
,
Optional
from
_pytest.python
import
Function
from
hypothesis
import
HealthCheck
,
settings
import
pytest
from
django.contrib.auth.models
import
User
from
django.core.cache
import
cache
from
django.test.utils
import
setup_databases
# type: ignore
from
rest_framework.test
import
APIClient
,
APIRequestFactory
from
swh.model.hashutil
import
(
ALGORITHMS
,
DEFAULT_ALGORITHMS
,
hash_to_bytes
,
hash_to_hex
,
)
from
swh.model.model
import
Content
,
Directory
from
swh.model.swhids
import
ObjectType
from
swh.scheduler.tests.common
import
TASK_TYPES
from
swh.storage.algos.origin
import
origin_get_latest_visit_status
from
swh.storage.algos.revisions_walker
import
get_revisions_walker
from
swh.storage.algos.snapshot
import
snapshot_get_all_branches
,
snapshot_get_latest
from
swh.web.auth.utils
import
OIDC_SWH_WEB_CLIENT_ID
from
swh.web.common
import
converters
from
swh.web.common.origin_save
import
get_scheduler_load_task_types
from
swh.web.common.typing
import
OriginVisitInfo
from
swh.web.common.utils
import
browsers_supported_image_mimes
from
swh.web.config
import
get_config
from
swh.web.tests.data
import
(
get_tests_data
,
override_storages
,
random_content
,
random_sha1
,
random_sha256
,
)
# Used to skip some tests
ctags_json_missing
=
(
shutil
.
which
(
"ctags"
)
is
None
or
b
"+json"
not
in
run
([
"ctags"
,
"--version"
],
stdout
=
PIPE
)
.
stdout
)
fossology_missing
=
shutil
.
which
(
"nomossa"
)
is
None
# Register some hypothesis profiles
settings
.
register_profile
(
"default"
,
settings
())
# we use getattr here to keep mypy happy regardless hypothesis version
function_scoped_fixture_check
=
(
[
getattr
(
HealthCheck
,
"function_scoped_fixture"
)]
if
hasattr
(
HealthCheck
,
"function_scoped_fixture"
)
else
[]
)
suppress_health_check
=
[
HealthCheck
.
too_slow
,
HealthCheck
.
filter_too_much
,
]
+
function_scoped_fixture_check
settings
.
register_profile
(
"swh-web"
,
settings
(
deadline
=
None
,
suppress_health_check
=
suppress_health_check
,),
)
settings
.
register_profile
(
"swh-web-fast"
,
settings
(
deadline
=
None
,
max_examples
=
5
,
suppress_health_check
=
suppress_health_check
,
),
)
def
pytest_addoption
(
parser
):
parser
.
addoption
(
"--swh-web-random-seed"
,
action
=
"store"
,
default
=
None
)
def
pytest_configure
(
config
):
# Use fast hypothesis profile by default if none has been
# explicitly specified in pytest option
if
config
.
getoption
(
"--hypothesis-profile"
)
is
None
:
settings
.
load_profile
(
"swh-web-fast"
)
# Small hack in order to be able to run the unit tests
# without static assets generated by webpack.
# Those assets are not really needed for the Python tests
# but the django templates will fail to load due to missing
# generated file webpack-stats.json describing the js and css
# files to include.
# So generate a dummy webpack-stats.json file to overcome
# that issue.
test_dir
=
os
.
path
.
dirname
(
__file__
)
# location of the static folder when running tests through tox
data_dir
=
os
.
path
.
join
(
sys
.
prefix
,
"share/swh/web"
)
static_dir
=
os
.
path
.
join
(
data_dir
,
"static"
)
if
not
os
.
path
.
exists
(
static_dir
):
# location of the static folder when running tests locally with pytest
static_dir
=
os
.
path
.
join
(
test_dir
,
"../../../static"
)
webpack_stats
=
os
.
path
.
join
(
static_dir
,
"webpack-stats.json"
)
if
os
.
path
.
exists
(
webpack_stats
):
return
bundles_dir
=
os
.
path
.
join
(
test_dir
,
"../../../assets/src/bundles"
)
if
not
os
.
path
.
exists
(
bundles_dir
):
# location of the bundles folder when running tests with tox
bundles_dir
=
os
.
path
.
join
(
data_dir
,
"assets/src/bundles"
)
_
,
bundles
,
_
=
next
(
os
.
walk
(
bundles_dir
))
mock_webpack_stats
=
{
"status"
:
"done"
,
"publicPath"
:
"/static"
,
"chunks"
:
{},
"assets"
:
{},
}
for
bundle
in
bundles
:
asset
=
f
"js/{bundle}.js"
mock_webpack_stats
[
"chunks"
][
bundle
]
=
[
asset
]
mock_webpack_stats
[
"assets"
][
asset
]
=
{
"name"
:
asset
,
"publicPath"
:
f
"/static/{asset}"
,
}
with
open
(
webpack_stats
,
"w"
)
as
outfile
:
json
.
dump
(
mock_webpack_stats
,
outfile
)
_swh_web_custom_section
=
"swh-web custom section"
_random_seed_cache_key
=
"swh-web/random-seed"
@pytest.fixture
(
scope
=
"function"
,
autouse
=
True
)
def
random_seed
(
pytestconfig
):
state
=
random
.
getstate
()
seed
=
pytestconfig
.
getoption
(
"--swh-web-random-seed"
)
if
seed
is
None
:
seed
=
time
.
time
()
seed
=
int
(
seed
)
cache
.
set
(
_random_seed_cache_key
,
seed
)
random
.
seed
(
seed
)
yield
seed
random
.
setstate
(
state
)
def
pytest_report_teststatus
(
report
,
*
args
):
if
report
.
when
==
"call"
and
report
.
outcome
==
"failed"
:
seed
=
cache
.
get
(
_random_seed_cache_key
,
None
)
line
=
(
f
'FAILED {report.nodeid}: Use "pytest --swh-web-random-seed={seed} '
f
'{report.nodeid}" to reproduce that test failure with same inputs'
)
report
.
sections
.
append
((
_swh_web_custom_section
,
line
))
def
pytest_terminal_summary
(
terminalreporter
,
*
args
):
reports
=
terminalreporter
.
getreports
(
"failed"
)
content
=
os
.
linesep
.
join
(
text
for
report
in
reports
for
secname
,
text
in
report
.
sections
if
secname
==
_swh_web_custom_section
)
if
content
:
terminalreporter
.
ensure_newline
()
terminalreporter
.
section
(
_swh_web_custom_section
,
sep
=
"-"
,
blue
=
True
,
bold
=
True
)
terminalreporter
.
line
(
content
)
# Clear Django cache before each test
@pytest.fixture
(
autouse
=
True
)
def
django_cache_cleared
():
cache
.
clear
()
# Alias rf fixture from pytest-django
@pytest.fixture
def
request_factory
(
rf
):
return
rf
# Fixture to get test client from Django REST Framework
@pytest.fixture
def
api_client
():
return
APIClient
()
# Fixture to get API request factory from Django REST Framework
@pytest.fixture
def
api_request_factory
():
return
APIRequestFactory
()
# Initialize tests data
@pytest.fixture
(
scope
=
"function"
,
autouse
=
True
)
def
tests_data
():
data
=
get_tests_data
(
reset
=
True
)
# Update swh-web configuration to use the in-memory storages
# instantiated in the tests.data module
override_storages
(
data
[
"storage"
],
data
[
"idx_storage"
],
data
[
"search"
],
data
[
"counters"
]
)
return
data
@pytest.fixture
(
scope
=
"function"
)
def
sha1
():
"""Fixture returning a valid hexadecimal sha1 value.
"""
return
random_sha1
()
@pytest.fixture
(
scope
=
"function"
)
def
invalid_sha1
():
"""Fixture returning an invalid sha1 representation.
"""
return
hash_to_hex
(
bytes
(
random
.
randint
(
0
,
255
)
for
_
in
range
(
50
)))
@pytest.fixture
(
scope
=
"function"
)
def
sha256
():
"""Fixture returning a valid hexadecimal sha256 value.
"""
return
random_sha256
()
def
_known_swh_objects
(
tests_data
,
object_type
):
return
tests_data
[
object_type
]
@pytest.fixture
(
scope
=
"function"
)
def
content
(
tests_data
):
"""Fixture returning a random content ingested into the test archive.
"""
return
random
.
choice
(
_known_swh_objects
(
tests_data
,
"contents"
))
@pytest.fixture
(
scope
=
"function"
)
def
contents
(
tests_data
):
"""Fixture returning random contents ingested into the test archive.
"""
return
random
.
choices
(
_known_swh_objects
(
tests_data
,
"contents"
),
k
=
random
.
randint
(
2
,
8
)
)
def
_new_content
(
tests_data
):
while
True
:
new_content
=
random_content
()
sha1_bytes
=
hash_to_bytes
(
new_content
[
"sha1"
])
if
tests_data
[
"storage"
]
.
content_get_data
(
sha1_bytes
)
is
None
:
return
new_content
@pytest.fixture
(
scope
=
"function"
)
def
unknown_content
(
tests_data
):
"""Fixture returning a random content not ingested into the test archive.
"""
return
_new_content
(
tests_data
)
@pytest.fixture
(
scope
=
"function"
)
def
unknown_contents
(
tests_data
):
"""Fixture returning random contents not ingested into the test archive.
"""
new_contents
=
[]
new_content_ids
=
set
()
nb_contents
=
random
.
randint
(
2
,
8
)
while
len
(
new_contents
)
!=
nb_contents
:
new_content
=
_new_content
(
tests_data
)
if
new_content
[
"sha1"
]
not
in
new_content_ids
:
new_contents
.
append
(
new_content
)
new_content_ids
.
add
(
new_content
[
"sha1"
])
return
list
(
new_contents
)
@pytest.fixture
(
scope
=
"function"
)
def
empty_content
():
"""Fixture returning the empty content ingested into the test archive.
"""
empty_content
=
Content
.
from_data
(
data
=
b
""
)
.
to_dict
()
for
algo
in
DEFAULT_ALGORITHMS
:
empty_content
[
algo
]
=
hash_to_hex
(
empty_content
[
algo
])
return
empty_content
@functools.lru_cache
(
maxsize
=
None
)
def
_content_text
():
return
list
(
filter
(
lambda
c
:
c
[
"mimetype"
]
.
startswith
(
"text/"
),
_known_swh_objects
(
get_tests_data
(),
"contents"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_text
():
"""
Fixture returning a random textual content ingested into the test archive.
"""
return
random
.
choice
(
_content_text
())
@functools.lru_cache
(
maxsize
=
None
)
def
_content_text_non_utf8
():
return
list
(
filter
(
lambda
c
:
c
[
"mimetype"
]
.
startswith
(
"text/"
)
and
c
[
"encoding"
]
not
in
(
"utf-8"
,
"us-ascii"
),
_known_swh_objects
(
get_tests_data
(),
"contents"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_text_non_utf8
():
"""Fixture returning a random textual content not encoded to UTF-8 ingested
into the test archive.
"""
return
random
.
choice
(
_content_text_non_utf8
())
@functools.lru_cache
(
maxsize
=
None
)
def
_content_application_no_highlight
():
return
list
(
filter
(
lambda
c
:
c
[
"mimetype"
]
.
startswith
(
"application/"
)
and
c
[
"hljs_language"
]
==
"plaintext"
,
_known_swh_objects
(
get_tests_data
(),
"contents"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_application_no_highlight
():
"""Fixture returning a random textual content with mimetype
starting with application/ and no detected programming language to
highlight ingested into the test archive.
"""
return
random
.
choice
(
_content_application_no_highlight
())
@functools.lru_cache
(
maxsize
=
None
)
def
_content_text_no_highlight
():
return
list
(
filter
(
lambda
c
:
c
[
"mimetype"
]
.
startswith
(
"text/"
)
and
c
[
"hljs_language"
]
==
"plaintext"
,
_known_swh_objects
(
get_tests_data
(),
"contents"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_text_no_highlight
():
"""Fixture returning a random textual content with no detected
programming language to highlight ingested into the test archive.
"""
return
random
.
choice
(
_content_text_no_highlight
())
@functools.lru_cache
(
maxsize
=
None
)
def
_content_image_type
():
return
list
(
filter
(
lambda
c
:
c
[
"mimetype"
]
in
browsers_supported_image_mimes
,
_known_swh_objects
(
get_tests_data
(),
"contents"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_image_type
():
"""Fixture returning a random image content ingested into the test archive.
"""
return
random
.
choice
(
_content_image_type
())
@functools.lru_cache
(
maxsize
=
None
)
def
_content_unsupported_image_type_rendering
():
return
list
(
filter
(
lambda
c
:
c
[
"mimetype"
]
.
startswith
(
"image/"
)
and
c
[
"mimetype"
]
not
in
browsers_supported_image_mimes
,
_known_swh_objects
(
get_tests_data
(),
"contents"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_unsupported_image_type_rendering
():
"""Fixture returning a random image content ingested into the test archive that
can not be rendered by browsers.
"""
return
random
.
choice
(
_content_unsupported_image_type_rendering
())
@functools.lru_cache
(
maxsize
=
None
)
def
_content_utf8_detected_as_binary
():
def
utf8_binary_detected
(
content
):
if
content
[
"encoding"
]
!=
"binary"
:
return
False
try
:
content
[
"raw_data"
]
.
decode
(
"utf-8"
)
except
Exception
:
return
False
else
:
return
True
return
list
(
filter
(
utf8_binary_detected
,
_known_swh_objects
(
get_tests_data
(),
"contents"
))
)
@pytest.fixture
(
scope
=
"function"
)
def
content_utf8_detected_as_binary
():
"""Fixture returning a random textual content detected as binary
by libmagic while they are valid UTF-8 encoded files.
"""
return
random
.
choice
(
_content_utf8_detected_as_binary
())
@pytest.fixture
(
scope
=
"function"
)
def
contents_with_ctags
():
"""
Fixture returning contents ingested into the test archive.
Those contents are ctags compatible, that is running ctags on those lay results.
"""
return
{
"sha1s"
:
[
"0ab37c02043ebff946c1937523f60aadd0844351"
,
"15554cf7608dde6bfefac7e3d525596343a85b6f"
,
"2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd"
,
"30acd0b47fc25e159e27a980102ddb1c4bea0b95"
,
"4f81f05aaea3efb981f9d90144f746d6b682285b"
,
"5153aa4b6e4455a62525bc4de38ed0ff6e7dd682"
,
"59d08bafa6a749110dfb65ba43a61963d5a5bf9f"
,
"7568285b2d7f31ae483ae71617bd3db873deaa2c"
,
"7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4"
,
"8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03"
,
"9b3557f1ab4111c8607a4f2ea3c1e53c6992916c"
,
"9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd"
,
"c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b"
,
"e89e55a12def4cd54d5bff58378a3b5119878eb7"
,
"e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e"
,
"eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5"
,
],
"symbol_name"
:
"ABS"
,
}
@pytest.fixture
(
scope
=
"function"
)
def
directory
(
tests_data
):
"""Fixture returning a random directory ingested into the test archive.
"""
return
random
.
choice
(
_known_swh_objects
(
tests_data
,
"directories"
))
@functools.lru_cache
(
maxsize
=
None
)
def
_directory_with_entry_type
(
type_
):
tests_data
=
get_tests_data
()
return
list
(
filter
(
lambda
d
:
any
(
[
e
[
"type"
]
==
type_
for
e
in
list
(
tests_data
[
"storage"
]
.
directory_ls
(
hash_to_bytes
(
d
)))
]
),
_known_swh_objects
(
tests_data
,
"directories"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
directory_with_subdirs
():
"""Fixture returning a random directory containing sub directories ingested
into the test archive.
"""
return
random
.
choice
(
_directory_with_entry_type
(
"dir"
))
@pytest.fixture
(
scope
=
"function"
)
def
directory_with_files
():
"""Fixture returning a random directory containing at least one regular file.
"""
return
random
.
choice
(
_directory_with_entry_type
(
"file"
))
@pytest.fixture
(
scope
=
"function"
)
def
unknown_directory
(
tests_data
):
"""Fixture returning a random directory not ingested into the test archive.
"""
while
True
:
new_directory
=
random_sha1
()
sha1_bytes
=
hash_to_bytes
(
new_directory
)
if
list
(
tests_data
[
"storage"
]
.
directory_missing
([
sha1_bytes
])):
return
new_directory
@pytest.fixture
(
scope
=
"function"
)
def
empty_directory
():
"""Fixture returning the empty directory ingested into the test archive.
"""
return
Directory
(
entries
=
())
.
id
.
hex
()
@pytest.fixture
(
scope
=
"function"
)
def
revision
(
tests_data
):
"""Fixturereturning a random revision ingested into the test archive.
"""
return
random
.
choice
(
_known_swh_objects
(
tests_data
,
"revisions"
))
@pytest.fixture
(
scope
=
"function"
)
def
revisions
(
tests_data
):
"""Fixture returning random revisions ingested into the test archive.
"""
return
random
.
choices
(
_known_swh_objects
(
tests_data
,
"revisions"
),
k
=
random
.
randint
(
2
,
8
),
)
@pytest.fixture
(
scope
=
"function"
)
def
revisions_list
(
tests_data
):
"""Fixture returning random revisions ingested into the test archive.
"""
def
gen_revisions_list
(
size
):
return
random
.
choices
(
_known_swh_objects
(
tests_data
,
"revisions"
),
k
=
size
,)
return
gen_revisions_list
@pytest.fixture
(
scope
=
"function"
)
def
unknown_revision
(
tests_data
):
"""Fixture returning a random revision not ingested into the test archive.
"""
while
True
:
new_revision
=
random_sha1
()
sha1_bytes
=
hash_to_bytes
(
new_revision
)
if
tests_data
[
"storage"
]
.
revision_get
([
sha1_bytes
])[
0
]
is
None
:
return
new_revision
def
_get_origin_dfs_revisions_walker
(
tests_data
):
storage
=
tests_data
[
"storage"
]
origin
=
random
.
choice
(
tests_data
[
"origins"
][:
-
1
])
snapshot
=
snapshot_get_latest
(
storage
,
origin
[
"url"
])
if
snapshot
.
branches
[
b
"HEAD"
]
.
target_type
.
value
==
"alias"
:
target
=
snapshot
.
branches
[
b
"HEAD"
]
.
target
head
=
snapshot
.
branches
[
target
]
.
target
else
:
head
=
snapshot
.
branches
[
b
"HEAD"
]
.
target
return
get_revisions_walker
(
"dfs"
,
storage
,
head
)
@functools.lru_cache
(
maxsize
=
None
)
def
_ancestor_revisions_data
():
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker
=
_get_origin_dfs_revisions_walker
(
get_tests_data
())
master_revisions
=
[]
children
=
defaultdict
(
list
)
init_rev_found
=
False
# get revisions only authored in the master branch
for
rev
in
revisions_walker
:
for
rev_p
in
rev
[
"parents"
]:
children
[
rev_p
]
.
append
(
rev
[
"id"
])
if
not
init_rev_found
:
master_revisions
.
append
(
rev
)
if
not
rev
[
"parents"
]:
init_rev_found
=
True
return
master_revisions
,
children
@pytest.fixture
(
scope
=
"function"
)
def
ancestor_revisions
():
"""Fixture returning a pair of revisions ingested into the test archive
with an ancestor relation.
"""
master_revisions
,
children
=
_ancestor_revisions_data
()
# head revision
root_rev
=
master_revisions
[
0
]
# pick a random revision, different from head, only authored
# in the master branch
ancestor_rev_idx
=
random
.
choice
(
list
(
range
(
1
,
len
(
master_revisions
)
-
1
)))
ancestor_rev
=
master_revisions
[
ancestor_rev_idx
]
ancestor_child_revs
=
children
[
ancestor_rev
[
"id"
]]
return
{
"sha1_git_root"
:
hash_to_hex
(
root_rev
[
"id"
]),
"sha1_git"
:
hash_to_hex
(
ancestor_rev
[
"id"
]),
"children"
:
[
hash_to_hex
(
r
)
for
r
in
ancestor_child_revs
],
}
@functools.lru_cache
(
maxsize
=
None
)
def
_non_ancestor_revisions_data
():
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker
=
_get_origin_dfs_revisions_walker
(
get_tests_data
())
merge_revs
=
[]
children
=
defaultdict
(
list
)
# get all merge revisions
for
rev
in
revisions_walker
:
if
len
(
rev
[
"parents"
])
>
1
:
merge_revs
.
append
(
rev
)
for
rev_p
in
rev
[
"parents"
]:
children
[
rev_p
]
.
append
(
rev
[
"id"
])
return
merge_revs
,
children
@pytest.fixture
(
scope
=
"function"
)
def
non_ancestor_revisions
():
"""Fixture returning a pair of revisions ingested into the test archive
with no ancestor relation.
"""
merge_revs
,
children
=
_non_ancestor_revisions_data
()
# find a merge revisions whose parents have a unique child revision
random
.
shuffle
(
merge_revs
)
selected_revs
=
None
for
merge_rev
in
merge_revs
:
if
all
(
len
(
children
[
rev_p
])
==
1
for
rev_p
in
merge_rev
[
"parents"
]):
selected_revs
=
merge_rev
[
"parents"
]
return
{
"sha1_git_root"
:
hash_to_hex
(
selected_revs
[
0
]),
"sha1_git"
:
hash_to_hex
(
selected_revs
[
1
]),
}
@pytest.fixture
(
scope
=
"function"
)
def
revision_with_submodules
():
"""Fixture returning a revision that is known to
point to a directory with revision entries (aka git submodules)
"""
return
{
"rev_sha1_git"
:
"ffcb69001f3f6745dfd5b48f72ab6addb560e234"
,
"rev_dir_sha1_git"
:
"d92a21446387fa28410e5a74379c934298f39ae2"
,
"rev_dir_rev_path"
:
"libtess2"
,
}
@pytest.fixture
(
scope
=
"function"
)
def
release
(
tests_data
):
"""Fixture returning a random release ingested into the test archive.
"""
return
random
.
choice
(
_known_swh_objects
(
tests_data
,
"releases"
))
@pytest.fixture
(
scope
=
"function"
)
def
releases
(
tests_data
):
"""Fixture returning random releases ingested into the test archive.
"""
return
random
.
choices
(
_known_swh_objects
(
tests_data
,
"releases"
),
k
=
random
.
randint
(
2
,
8
)
)
@pytest.fixture
(
scope
=
"function"
)
def
unknown_release
(
tests_data
):
"""Fixture returning a random release not ingested into the test archive.
"""
while
True
:
new_release
=
random_sha1
()
sha1_bytes
=
hash_to_bytes
(
new_release
)
if
tests_data
[
"storage"
]
.
release_get
([
sha1_bytes
])[
0
]
is
None
:
return
new_release
@pytest.fixture
(
scope
=
"function"
)
def
snapshot
(
tests_data
):
"""Fixture returning a random snapshot ingested into the test archive.
"""
return
random
.
choice
(
_known_swh_objects
(
tests_data
,
"snapshots"
))
@pytest.fixture
(
scope
=
"function"
)
def
unknown_snapshot
(
tests_data
):
"""Fixture returning a random snapshot not ingested into the test archive.
"""
while
True
:
new_snapshot
=
random_sha1
()
sha1_bytes
=
hash_to_bytes
(
new_snapshot
)
if
tests_data
[
"storage"
]
.
snapshot_get_branches
(
sha1_bytes
)
is
None
:
return
new_snapshot
@pytest.fixture
(
scope
=
"function"
)
def
origin
(
tests_data
):
"""Fixture returning a random origin ingested into the test archive.
"""
return
random
.
choice
(
_known_swh_objects
(
tests_data
,
"origins"
))
@functools.lru_cache
(
maxsize
=
None
)
def
_origin_with_multiple_visits
():
tests_data
=
get_tests_data
()
origins
=
[]
storage
=
tests_data
[
"storage"
]
for
origin
in
tests_data
[
"origins"
]:
visit_page
=
storage
.
origin_visit_get
(
origin
[
"url"
])
if
len
(
visit_page
.
results
)
>
1
:
origins
.
append
(
origin
)
return
origins
@pytest.fixture
(
scope
=
"function"
)
def
origin_with_multiple_visits
():
"""Fixture returning a random origin with multiple visits ingested
into the test archive.
"""
return
random
.
choice
(
_origin_with_multiple_visits
())
@functools.lru_cache
(
maxsize
=
None
)
def
_origin_with_releases
():
tests_data
=
get_tests_data
()
origins
=
[]
for
origin
in
tests_data
[
"origins"
]:
snapshot
=
snapshot_get_latest
(
tests_data
[
"storage"
],
origin
[
"url"
])
if
any
([
b
.
target_type
.
value
==
"release"
for
b
in
snapshot
.
branches
.
values
()]):
origins
.
append
(
origin
)
return
origins
@pytest.fixture
(
scope
=
"function"
)
def
origin_with_releases
():
"""Fixture returning a random origin with releases ingested into the test archive.
"""
return
random
.
choice
(
_origin_with_releases
())
@functools.lru_cache
(
maxsize
=
None
)
def
_origin_with_pull_request_branches
():
tests_data
=
get_tests_data
()
origins
=
[]
storage
=
tests_data
[
"storage"
]
for
origin
in
storage
.
origin_list
(
limit
=
1000
)
.
results
:
snapshot
=
snapshot_get_latest
(
storage
,
origin
.
url
)
if
any
([
b
"refs/pull/"
in
b
for
b
in
snapshot
.
branches
]):
origins
.
append
(
origin
)
return
origins
@pytest.fixture
(
scope
=
"function"
)
def
origin_with_pull_request_branches
():
"""Fixture returning a random origin with pull request branches ingested
into the test archive.
"""
return
random
.
choice
(
_origin_with_pull_request_branches
())
@functools.lru_cache
(
maxsize
=
None
)
def
_object_type_swhid
(
object_type
):
return
list
(
filter
(
lambda
swhid
:
swhid
.
object_type
==
object_type
,
_known_swh_objects
(
get_tests_data
(),
"swhids"
),
)
)
@pytest.fixture
(
scope
=
"function"
)
def
content_swhid
():
"""Fixture returning a qualified SWHID for a random content object
ingested into the test archive.
"""
return
random
.
choice
(
_object_type_swhid
(
ObjectType
.
CONTENT
))
@pytest.fixture
(
scope
=
"function"
)
def
directory_swhid
():
"""Fixture returning a qualified SWHID for a random directory object
ingested into the test archive.
"""
return
random
.
choice
(
_object_type_swhid
(
ObjectType
.
DIRECTORY
))
@pytest.fixture
(
scope
=
"function"
)
def
release_swhid
():
"""Fixture returning a qualified SWHID for a random release object
ingested into the test archive.
"""
return
random
.
choice
(
_object_type_swhid
(
ObjectType
.
RELEASE
))
@pytest.fixture
(
scope
=
"function"
)
def
revision_swhid
():
"""Fixture returning a qualified SWHID for a random revision object
ingested into the test archive.
"""
return
random
.
choice
(
_object_type_swhid
(
ObjectType
.
REVISION
))
@pytest.fixture
(
scope
=
"function"
)
def
snapshot_swhid
():
"""Fixture returning a qualified SWHID for a snapshot object
ingested into the test archive.
"""
return
random
.
choice
(
_object_type_swhid
(
ObjectType
.
SNAPSHOT
))
# Fixture to manipulate data from a sample archive used in the tests
@pytest.fixture
(
scope
=
"function"
)
def
archive_data
(
tests_data
):
return
_ArchiveData
(
tests_data
)
# Fixture to manipulate indexer data from a sample archive used in the tests
@pytest.fixture
(
scope
=
"function"
)
def
indexer_data
(
tests_data
):
return
_IndexerData
(
tests_data
)
# Custom data directory for requests_mock
@pytest.fixture
def
datadir
():
return
os
.
path
.
join
(
os
.
path
.
abspath
(
os
.
path
.
dirname
(
__file__
)),
"resources"
)
class
_ArchiveData
:
"""
Helper class to manage data from a sample test archive.
It is initialized with a reference to an in-memory storage
containing raw tests data.
It is basically a proxy to Storage interface but it overrides some methods
to retrieve those tests data in a json serializable format in order to ease
tests implementation.
"""
def
__init__
(
self
,
tests_data
):
self
.
storage
=
tests_data
[
"storage"
]
def
__getattr__
(
self
,
key
):
if
key
==
"storage"
:
raise
AttributeError
(
key
)
# Forward calls to non overridden Storage methods to wrapped
# storage instance
return
getattr
(
self
.
storage
,
key
)
def
content_find
(
self
,
content
:
Dict
[
str
,
Any
])
->
Dict
[
str
,
Any
]:
cnt_ids_bytes
=
{
algo_hash
:
hash_to_bytes
(
content
[
algo_hash
])
for
algo_hash
in
ALGORITHMS
if
content
.
get
(
algo_hash
)
}
cnt
=
self
.
storage
.
content_find
(
cnt_ids_bytes
)
return
converters
.
from_content
(
cnt
[
0
]
.
to_dict
())
if
cnt
else
cnt
def
content_get
(
self
,
cnt_id
:
str
)
->
Dict
[
str
,
Any
]:
cnt_id_bytes
=
hash_to_bytes
(
cnt_id
)
content
=
self
.
storage
.
content_get
([
cnt_id_bytes
])[
0
]
if
content
:
content_d
=
content
.
to_dict
()
content_d
.
pop
(
"ctime"
,
None
)
else
:
content_d
=
None
return
converters
.
from_swh
(
content_d
,
hashess
=
{
"sha1"
,
"sha1_git"
,
"sha256"
,
"blake2s256"
}
)
def
content_get_data
(
self
,
cnt_id
:
str
)
->
Optional
[
Dict
[
str
,
Any
]]:
cnt_id_bytes
=
hash_to_bytes
(
cnt_id
)
cnt_data
=
self
.
storage
.
content_get_data
(
cnt_id_bytes
)
if
cnt_data
is
None
:
return
None
return
converters
.
from_content
({
"data"
:
cnt_data
,
"sha1"
:
cnt_id_bytes
})
def
directory_get
(
self
,
dir_id
):
return
{
"id"
:
dir_id
,
"content"
:
self
.
directory_ls
(
dir_id
)}
def
directory_ls
(
self
,
dir_id
):
cnt_id_bytes
=
hash_to_bytes
(
dir_id
)
dir_content
=
map
(
converters
.
from_directory_entry
,
self
.
storage
.
directory_ls
(
cnt_id_bytes
)
)
return
list
(
dir_content
)
def
release_get
(
self
,
rel_id
:
str
)
->
Optional
[
Dict
[
str
,
Any
]]:
rel_id_bytes
=
hash_to_bytes
(
rel_id
)
rel_data
=
self
.
storage
.
release_get
([
rel_id_bytes
])[
0
]
return
converters
.
from_release
(
rel_data
)
if
rel_data
else
None
def
revision_get
(
self
,
rev_id
:
str
)
->
Optional
[
Dict
[
str
,
Any
]]:
rev_id_bytes
=
hash_to_bytes
(
rev_id
)
rev_data
=
self
.
storage
.
revision_get
([
rev_id_bytes
])[
0
]
return
converters
.
from_revision
(
rev_data
)
if
rev_data
else
None
def
revision_log
(
self
,
rev_id
,
limit
=
None
):
rev_id_bytes
=
hash_to_bytes
(
rev_id
)
return
list
(
map
(
converters
.
from_revision
,
self
.
storage
.
revision_log
([
rev_id_bytes
],
limit
=
limit
),
)
)
def
snapshot_get_latest
(
self
,
origin_url
):
snp
=
snapshot_get_latest
(
self
.
storage
,
origin_url
)
return
converters
.
from_snapshot
(
snp
.
to_dict
())
def
origin_get
(
self
,
origin_urls
):
origins
=
self
.
storage
.
origin_get
(
origin_urls
)
return
[
converters
.
from_origin
(
o
.
to_dict
())
for
o
in
origins
]
def
origin_visit_get
(
self
,
origin_url
):
next_page_token
=
None
visits
=
[]
while
True
:
visit_page
=
self
.
storage
.
origin_visit_get
(
origin_url
,
page_token
=
next_page_token
)
next_page_token
=
visit_page
.
next_page_token
for
visit
in
visit_page
.
results
:
visit_status
=
self
.
storage
.
origin_visit_status_get_latest
(
origin_url
,
visit
.
visit
)
visits
.
append
(
converters
.
from_origin_visit
(
{
**
visit_status
.
to_dict
(),
"type"
:
visit
.
type
}
)
)
if
not
next_page_token
:
break
return
visits
def
origin_visit_get_by
(
self
,
origin_url
:
str
,
visit_id
:
int
)
->
OriginVisitInfo
:
visit
=
self
.
storage
.
origin_visit_get_by
(
origin_url
,
visit_id
)
assert
visit
is
not
None
visit_status
=
self
.
storage
.
origin_visit_status_get_latest
(
origin_url
,
visit_id
)
assert
visit_status
is
not
None
return
converters
.
from_origin_visit
(
{
**
visit_status
.
to_dict
(),
"type"
:
visit
.
type
}
)
def
origin_visit_status_get_latest
(
self
,
origin_url
,
type
:
Optional
[
str
]
=
None
,
allowed_statuses
:
Optional
[
List
[
str
]]
=
None
,
require_snapshot
:
bool
=
False
,
):
visit_status
=
origin_get_latest_visit_status
(
self
.
storage
,
origin_url
,
type
=
type
,
allowed_statuses
=
allowed_statuses
,
require_snapshot
=
require_snapshot
,
)
return
(
converters
.
from_origin_visit
(
visit_status
.
to_dict
())
if
visit_status
else
None
)
def
snapshot_get
(
self
,
snapshot_id
):
snp
=
snapshot_get_all_branches
(
self
.
storage
,
hash_to_bytes
(
snapshot_id
))
return
converters
.
from_snapshot
(
snp
.
to_dict
())
def
snapshot_get_branches
(
self
,
snapshot_id
,
branches_from
=
""
,
branches_count
=
1000
,
target_types
=
None
):
partial_branches
=
self
.
storage
.
snapshot_get_branches
(
hash_to_bytes
(
snapshot_id
),
branches_from
.
encode
(),
branches_count
,
target_types
,
)
return
converters
.
from_partial_branches
(
partial_branches
)
def
snapshot_get_head
(
self
,
snapshot
):
if
snapshot
[
"branches"
][
"HEAD"
][
"target_type"
]
==
"alias"
:
target
=
snapshot
[
"branches"
][
"HEAD"
][
"target"
]
head
=
snapshot
[
"branches"
][
target
][
"target"
]
else
:
head
=
snapshot
[
"branches"
][
"HEAD"
][
"target"
]
return
head
def
snapshot_count_branches
(
self
,
snapshot_id
):
counts
=
dict
.
fromkeys
((
"alias"
,
"release"
,
"revision"
),
0
)
counts
.
update
(
self
.
storage
.
snapshot_count_branches
(
hash_to_bytes
(
snapshot_id
)))
counts
.
pop
(
None
,
None
)
return
counts
class
_IndexerData
:
"""
Helper class to manage indexer tests data
It is initialized with a reference to an in-memory indexer storage
containing raw tests data.
It also defines class methods to retrieve those tests data in
a json serializable format in order to ease tests implementation.
"""
def
__init__
(
self
,
tests_data
):
self
.
idx_storage
=
tests_data
[
"idx_storage"
]
self
.
mimetype_indexer
=
tests_data
[
"mimetype_indexer"
]
self
.
license_indexer
=
tests_data
[
"license_indexer"
]
self
.
ctags_indexer
=
tests_data
[
"ctags_indexer"
]
def
content_add_mimetype
(
self
,
cnt_id
):
self
.
mimetype_indexer
.
run
([
hash_to_bytes
(
cnt_id
)])
def
content_get_mimetype
(
self
,
cnt_id
):
mimetype
=
self
.
idx_storage
.
content_mimetype_get
([
hash_to_bytes
(
cnt_id
)])[
0
]
.
to_dict
()
return
converters
.
from_filetype
(
mimetype
)
def
content_add_license
(
self
,
cnt_id
):
self
.
license_indexer
.
run
([
hash_to_bytes
(
cnt_id
)])
def
content_get_license
(
self
,
cnt_id
):
cnt_id_bytes
=
hash_to_bytes
(
cnt_id
)
licenses
=
self
.
idx_storage
.
content_fossology_license_get
([
cnt_id_bytes
])
for
license
in
licenses
:
yield
converters
.
from_swh
(
license
.
to_dict
(),
hashess
=
{
"id"
})
def
content_add_ctags
(
self
,
cnt_id
):
self
.
ctags_indexer
.
run
([
hash_to_bytes
(
cnt_id
)])
def
content_get_ctags
(
self
,
cnt_id
):
cnt_id_bytes
=
hash_to_bytes
(
cnt_id
)
ctags
=
self
.
idx_storage
.
content_ctags_get
([
cnt_id_bytes
])
for
ctag
in
ctags
:
yield
converters
.
from_swh
(
ctag
,
hashess
=
{
"id"
})
@pytest.fixture
def
keycloak_oidc
(
keycloak_oidc
,
mocker
):
keycloak_config
=
get_config
()[
"keycloak"
]
keycloak_oidc
.
server_url
=
keycloak_config
[
"server_url"
]
keycloak_oidc
.
realm_name
=
keycloak_config
[
"realm_name"
]
keycloak_oidc
.
client_id
=
OIDC_SWH_WEB_CLIENT_ID
keycloak_oidc_client
=
mocker
.
patch
(
"swh.web.auth.views.keycloak_oidc_client"
)
keycloak_oidc_client
.
return_value
=
keycloak_oidc
return
keycloak_oidc
@pytest.fixture
def
subtest
(
request
):
"""A hack to explicitly set up and tear down fixtures.
This fixture allows you to set up and tear down fixtures within the test
function itself. This is useful (necessary!) for using Hypothesis inside
pytest, as hypothesis will call the test function multiple times, without
setting up or tearing down fixture state as it is normally the case.
Copied from the pytest-subtesthack project, public domain license
(https://github.com/untitaker/pytest-subtesthack).
"""
parent_test
=
request
.
node
def
inner
(
func
):
if
hasattr
(
Function
,
"from_parent"
):
item
=
Function
.
from_parent
(
parent_test
,
name
=
request
.
function
.
__name__
+
"[]"
,
originalname
=
request
.
function
.
__name__
,
callobj
=
func
,
)
else
:
item
=
Function
(
name
=
request
.
function
.
__name__
+
"[]"
,
parent
=
parent_test
,
callobj
=
func
)
nextitem
=
parent_test
# prevents pytest from tearing down module fixtures
item
.
ihook
.
pytest_runtest_setup
(
item
=
item
)
item
.
ihook
.
pytest_runtest_call
(
item
=
item
)
item
.
ihook
.
pytest_runtest_teardown
(
item
=
item
,
nextitem
=
nextitem
)
return
inner
@pytest.fixture
def
swh_scheduler
(
swh_scheduler
):
config
=
get_config
()
scheduler
=
config
[
"scheduler"
]
config
[
"scheduler"
]
=
swh_scheduler
# create load-git and load-hg task types
for
task_type
in
TASK_TYPES
.
values
():
# see https://forge.softwareheritage.org/rDSCHc46ffadf7adf24c7eb3ffce062e8ade3818c79cc # noqa
task_type
[
"type"
]
=
task_type
[
"type"
]
.
replace
(
"load-test-"
,
"load-"
,
1
)
swh_scheduler
.
create_task_type
(
task_type
)
# create load-svn task type
swh_scheduler
.
create_task_type
(
{
"type"
:
"load-svn"
,
"description"
:
"Update a Subversion repository"
,
"backend_name"
:
"swh.loader.svn.tasks.DumpMountAndLoadSvnRepository"
,
"default_interval"
:
timedelta
(
days
=
64
),
"min_interval"
:
timedelta
(
hours
=
12
),
"max_interval"
:
timedelta
(
days
=
64
),
"backoff_factor"
:
2
,
"max_queue_length"
:
None
,
"num_retries"
:
7
,
"retry_delay"
:
timedelta
(
hours
=
2
),
}
)
# create load-cvs task type
swh_scheduler
.
create_task_type
(
{
"type"
:
"load-cvs"
,
"description"
:
"Update a CVS repository"
,
"backend_name"
:
"swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository"
,
"default_interval"
:
timedelta
(
days
=
64
),
"min_interval"
:
timedelta
(
hours
=
12
),
"max_interval"
:
timedelta
(
days
=
64
),
"backoff_factor"
:
2
,
"max_queue_length"
:
None
,
"num_retries"
:
7
,
"retry_delay"
:
timedelta
(
hours
=
2
),
}
)
# create load-bzr task type
swh_scheduler
.
create_task_type
(
{
"type"
:
"load-bzr"
,
"description"
:
"Update a Bazaar repository"
,
"backend_name"
:
"swh.loader.bzr.tasks.LoadBazaar"
,
"default_interval"
:
timedelta
(
days
=
64
),
"min_interval"
:
timedelta
(
hours
=
12
),
"max_interval"
:
timedelta
(
days
=
64
),
"backoff_factor"
:
2
,
"max_queue_length"
:
None
,
"num_retries"
:
7
,
"retry_delay"
:
timedelta
(
hours
=
2
),
}
)
# add method to add load-archive-files task type during tests
def
add_load_archive_task_type
():
swh_scheduler
.
create_task_type
(
{
"type"
:
"load-archive-files"
,
"description"
:
"Load tarballs"
,
"backend_name"
:
"swh.loader.package.archive.tasks.LoadArchive"
,
"default_interval"
:
timedelta
(
days
=
64
),
"min_interval"
:
timedelta
(
hours
=
12
),
"max_interval"
:
timedelta
(
days
=
64
),
"backoff_factor"
:
2
,
"max_queue_length"
:
None
,
"num_retries"
:
7
,
"retry_delay"
:
timedelta
(
hours
=
2
),
}
)
swh_scheduler
.
add_load_archive_task_type
=
add_load_archive_task_type
yield
swh_scheduler
config
[
"scheduler"
]
=
scheduler
get_scheduler_load_task_types
.
cache_clear
()
@pytest.fixture
(
scope
=
"session"
)
def
django_db_setup
(
request
,
django_db_blocker
,
postgresql_proc
):
from
django.conf
import
settings
settings
.
DATABASES
[
"default"
]
.
update
(
{
(
"ENGINE"
,
"django.db.backends.postgresql"
),
(
"NAME"
,
get_config
()[
"test_db"
][
"name"
]),
(
"USER"
,
postgresql_proc
.
user
),
(
"HOST"
,
postgresql_proc
.
host
),
(
"PORT"
,
postgresql_proc
.
port
),
}
)
with
django_db_blocker
.
unblock
():
setup_databases
(
verbosity
=
request
.
config
.
option
.
verbose
,
interactive
=
False
,
keepdb
=
False
)
@pytest.fixture
def
staff_user
():
return
User
.
objects
.
create_user
(
username
=
"admin"
,
password
=
""
,
is_staff
=
True
)
@pytest.fixture
def
regular_user
():
return
User
.
objects
.
create_user
(
username
=
"johndoe"
,
password
=
""
)
@pytest.fixture
def
regular_user2
():
return
User
.
objects
.
create_user
(
username
=
"janedoe"
,
password
=
""
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 2:43 PM (3 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3291966
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment