Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8322617
test_archive.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
39 KB
Subscribers
None
test_archive.py
View Options
# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
collections
import
defaultdict
import
datetime
import
hashlib
import
itertools
import
random
from
hypothesis
import
given
,
settings
import
pytest
from
swh.model.from_disk
import
DentryPerms
from
swh.model.hashutil
import
hash_to_bytes
,
hash_to_hex
from
swh.model.model
import
(
Directory
,
DirectoryEntry
,
Origin
,
OriginVisit
,
OriginVisitStatus
,
Revision
,
Snapshot
,
SnapshotBranch
,
TargetType
,
)
from
swh.model.swhids
import
ObjectType
from
swh.storage.utils
import
now
from
swh.web.tests.data
import
random_content
,
random_sha1
from
swh.web.tests.helpers
import
fossology_missing
from
swh.web.tests.strategies
import
new_origin
,
new_revision
,
visit_dates
from
swh.web.utils
import
archive
from
swh.web.utils.exc
import
BadInputExc
,
NotFoundExc
from
swh.web.utils.typing
import
OriginInfo
,
PagedResult
def
test_lookup_multiple_hashes_all_present
(
contents
):
input_data
=
[]
expected_output
=
[]
for
cnt
in
contents
:
input_data
.
append
({
"sha1"
:
cnt
[
"sha1"
]})
expected_output
.
append
({
"sha1"
:
cnt
[
"sha1"
],
"found"
:
True
})
assert
archive
.
lookup_multiple_hashes
(
input_data
)
==
expected_output
def
test_lookup_multiple_hashes_some_missing
(
contents
,
unknown_contents
):
input_contents
=
list
(
itertools
.
chain
(
contents
,
unknown_contents
))
random
.
shuffle
(
input_contents
)
input_data
=
[]
expected_output
=
[]
for
cnt
in
input_contents
:
input_data
.
append
({
"sha1"
:
cnt
[
"sha1"
]})
expected_output
.
append
({
"sha1"
:
cnt
[
"sha1"
],
"found"
:
cnt
in
contents
})
assert
archive
.
lookup_multiple_hashes
(
input_data
)
==
expected_output
def
test_lookup_hash_does_not_exist
():
unknown_content_
=
random_content
()
actual_lookup
=
archive
.
lookup_hash
(
"sha1_git:
%s
"
%
unknown_content_
[
"sha1_git"
])
assert
actual_lookup
==
{
"found"
:
None
,
"algo"
:
"sha1_git"
}
def
test_lookup_hash_exist
(
archive_data
,
content
):
actual_lookup
=
archive
.
lookup_hash
(
"sha1:
%s
"
%
content
[
"sha1"
])
content_metadata
=
archive_data
.
content_get
(
content
[
"sha1"
])
assert
{
"found"
:
content_metadata
,
"algo"
:
"sha1"
}
==
actual_lookup
def
test_search_hash_does_not_exist
():
unknown_content_
=
random_content
()
actual_lookup
=
archive
.
search_hash
(
"sha1_git:
%s
"
%
unknown_content_
[
"sha1_git"
])
assert
{
"found"
:
False
}
==
actual_lookup
def
test_search_hash_exist
(
content
):
actual_lookup
=
archive
.
search_hash
(
"sha1:
%s
"
%
content
[
"sha1"
])
assert
{
"found"
:
True
}
==
actual_lookup
def
test_lookup_content_filetype
(
indexer_data
,
content
):
indexer_data
.
content_add_mimetype
(
content
[
"sha1"
])
actual_filetype
=
archive
.
lookup_content_filetype
(
content
[
"sha1"
])
expected_filetype
=
indexer_data
.
content_get_mimetype
(
content
[
"sha1"
])
assert
actual_filetype
==
expected_filetype
@pytest.mark.skipif
(
fossology_missing
,
reason
=
"requires fossology-nomossa installed"
)
def
test_lookup_content_license
(
indexer_data
,
content
):
indexer_data
.
content_add_license
(
content
[
"sha1"
])
actual_license
=
archive
.
lookup_content_license
(
content
[
"sha1"
])
expected_license
=
indexer_data
.
content_get_license
(
content
[
"sha1"
])
assert
actual_license
==
expected_license
def
test_stat_counters
(
archive_data
):
actual_stats
=
archive
.
stat_counters
()
assert
actual_stats
==
archive_data
.
stat_counters
()
@given
(
new_origin
(),
visit_dates
())
def
test_lookup_origin_visits
(
subtest
,
new_origin
,
visit_dates
):
# ensure archive_data fixture will be reset between each hypothesis
# example test run
@subtest
def
test_inner
(
archive_data
):
archive_data
.
origin_add
([
new_origin
])
archive_data
.
origin_visit_add
(
[
OriginVisit
(
origin
=
new_origin
.
url
,
date
=
ts
,
type
=
"git"
,
)
for
ts
in
visit_dates
]
)
actual_origin_visits
=
list
(
archive
.
lookup_origin_visits
(
new_origin
.
url
,
per_page
=
100
)
)
expected_visits
=
archive_data
.
origin_visit_get
(
new_origin
.
url
)
for
expected_visit
in
expected_visits
:
expected_visit
[
"origin"
]
=
new_origin
.
url
assert
actual_origin_visits
==
expected_visits
@given
(
new_origin
(),
visit_dates
())
def
test_lookup_origin_visit
(
archive_data
,
new_origin
,
visit_dates
):
archive_data
.
origin_add
([
new_origin
])
visits
=
archive_data
.
origin_visit_add
(
[
OriginVisit
(
origin
=
new_origin
.
url
,
date
=
ts
,
type
=
"git"
,
)
for
ts
in
visit_dates
]
)
visit
=
random
.
choice
(
visits
)
.
visit
actual_origin_visit
=
archive
.
lookup_origin_visit
(
new_origin
.
url
,
visit
)
expected_visit
=
dict
(
archive_data
.
origin_visit_get_by
(
new_origin
.
url
,
visit
))
assert
actual_origin_visit
==
expected_visit
@given
(
new_origin
(),
visit_dates
())
@settings
(
max_examples
=
1
)
def
test_origin_visit_find_by_date_no_result
(
archive_data
,
new_origin
,
visit_dates
):
"""No visit registered in storage for an origin should return no visit"""
archive_data
.
origin_add
([
new_origin
])
for
visit_date
in
visit_dates
:
# No visit yet, so nothing will get returned
actual_origin_visit_status
=
archive
.
origin_visit_find_by_date
(
new_origin
.
url
,
visit_date
)
assert
actual_origin_visit_status
is
None
@settings
(
max_examples
=
1
)
@given
(
new_origin
())
def
test_origin_visit_find_by_date
(
archive_data
,
new_origin
):
# Add origin and two visits
archive_data
.
origin_add
([
new_origin
])
pivot_date
=
now
()
# First visit one hour before pivot date
first_visit_date
=
pivot_date
-
datetime
.
timedelta
(
hours
=
1
)
# Second visit two hours after pivot date
second_visit_date
=
pivot_date
+
datetime
.
timedelta
(
hours
=
2
)
visits
=
archive_data
.
origin_visit_add
(
[
OriginVisit
(
origin
=
new_origin
.
url
,
date
=
visit_date
,
type
=
"git"
,
)
for
visit_date
in
[
first_visit_date
,
second_visit_date
]
]
)
# Finalize visits
visit_statuses
=
[]
for
visit
in
visits
:
visit_statuses
.
append
(
OriginVisitStatus
(
origin
=
new_origin
.
url
,
visit
=
visit
.
visit
,
date
=
visit
.
date
+
datetime
.
timedelta
(
hours
=
1
),
type
=
visit
.
type
,
status
=
"full"
,
snapshot
=
None
,
)
)
archive_data
.
origin_visit_status_add
(
visit_statuses
)
# Check correct visit is returned when searching by date
for
search_date
,
greater_or_equal
,
expected_visit
in
[
(
first_visit_date
,
True
,
1
),
(
pivot_date
,
True
,
2
),
(
pivot_date
,
False
,
1
),
(
second_visit_date
,
True
,
2
),
]:
origin_visit
=
archive
.
origin_visit_find_by_date
(
new_origin
.
url
,
search_date
,
greater_or_equal
)
assert
origin_visit
[
"visit"
]
==
expected_visit
@given
(
new_origin
())
def
test_lookup_origin
(
archive_data
,
new_origin
):
archive_data
.
origin_add
([
new_origin
])
actual_origin
=
archive
.
lookup_origin
({
"url"
:
new_origin
.
url
})
expected_origin
=
archive_data
.
origin_get
([
new_origin
.
url
])[
0
]
assert
actual_origin
==
expected_origin
def
test_lookup_origin_snapshots
(
archive_data
,
origin_with_multiple_visits
):
origin_url
=
origin_with_multiple_visits
[
"url"
]
visits
=
archive_data
.
origin_visit_get
(
origin_url
)
origin_snapshots
=
archive
.
lookup_origin_snapshots
(
origin_with_multiple_visits
)
assert
set
(
origin_snapshots
)
==
{
v
[
"snapshot"
]
for
v
in
visits
}
def
test_lookup_release_ko_id_checksum_not_a_sha1
(
invalid_sha1
):
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_release
(
invalid_sha1
)
assert
e
.
match
(
"Invalid checksum"
)
def
test_lookup_release_ko_id_checksum_too_long
(
sha256
):
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_release
(
sha256
)
assert
e
.
match
(
"Only sha1_git is supported."
)
def
test_lookup_release_multiple
(
archive_data
,
releases
):
actual_releases
=
list
(
archive
.
lookup_release_multiple
(
releases
))
expected_releases
=
[]
for
release_id
in
releases
:
release_info
=
archive_data
.
release_get
(
release_id
)
expected_releases
.
append
(
release_info
)
assert
actual_releases
==
expected_releases
def
test_lookup_release_multiple_none_found
():
unknown_releases_
=
[
random_sha1
(),
random_sha1
(),
random_sha1
()]
actual_releases
=
list
(
archive
.
lookup_release_multiple
(
unknown_releases_
))
assert
actual_releases
==
[
None
]
*
len
(
unknown_releases_
)
def
test_lookup_directory_with_path_not_found
(
directory
):
path
=
"some/invalid/path/here"
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_directory_with_path
(
directory
,
path
)
assert
e
.
match
(
f
"Directory entry with path {path} from root directory {directory} not found"
)
def
test_lookup_directory_with_path_found
(
archive_data
,
directory
):
directory_content
=
archive_data
.
directory_ls
(
directory
)
directory_entry
=
random
.
choice
(
directory_content
)
path
=
directory_entry
[
"name"
]
actual_result
=
archive
.
lookup_directory_with_path
(
directory
,
path
)
assert
actual_result
==
directory_entry
def
test_lookup_release
(
archive_data
,
release
):
actual_release
=
archive
.
lookup_release
(
release
)
assert
actual_release
==
archive_data
.
release_get
(
release
)
def
test_lookup_revision_with_context_ko_not_a_sha1
(
revision
,
invalid_sha1
,
sha256
):
sha1_git_root
=
revision
sha1_git
=
invalid_sha1
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_revision_with_context
(
sha1_git_root
,
sha1_git
)
assert
e
.
match
(
"Invalid checksum query string"
)
sha1_git
=
sha256
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_revision_with_context
(
sha1_git_root
,
sha1_git
)
assert
e
.
match
(
"Only sha1_git is supported"
)
def
test_lookup_revision_with_context_ko_sha1_git_does_not_exist
(
revision
,
unknown_revision
):
sha1_git_root
=
revision
sha1_git
=
unknown_revision
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_revision_with_context
(
sha1_git_root
,
sha1_git
)
assert
e
.
match
(
"Revision
%s
not found"
%
sha1_git
)
def
test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist
(
revision
,
unknown_revision
):
sha1_git_root
=
unknown_revision
sha1_git
=
revision
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_revision_with_context
(
sha1_git_root
,
sha1_git
)
assert
e
.
match
(
"Revision root
%s
not found"
%
sha1_git_root
)
def
test_lookup_revision_with_context
(
archive_data
,
ancestor_revisions
):
sha1_git
=
ancestor_revisions
[
"sha1_git"
]
root_sha1_git
=
ancestor_revisions
[
"sha1_git_root"
]
for
sha1_git_root
in
(
root_sha1_git
,
{
"id"
:
hash_to_bytes
(
root_sha1_git
)}):
actual_revision
=
archive
.
lookup_revision_with_context
(
sha1_git_root
,
sha1_git
)
children
=
[]
for
rev
in
archive_data
.
revision_log
(
root_sha1_git
):
for
p_rev
in
rev
[
"parents"
]:
p_rev_hex
=
hash_to_hex
(
p_rev
)
if
p_rev_hex
==
sha1_git
:
children
.
append
(
rev
[
"id"
])
expected_revision
=
archive_data
.
revision_get
(
sha1_git
)
expected_revision
[
"children"
]
=
children
assert
actual_revision
==
expected_revision
def
test_lookup_revision_with_context_ko
(
non_ancestor_revisions
):
sha1_git
=
non_ancestor_revisions
[
"sha1_git"
]
root_sha1_git
=
non_ancestor_revisions
[
"sha1_git_root"
]
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_revision_with_context
(
root_sha1_git
,
sha1_git
)
assert
e
.
match
(
"Revision
%s
is not an ancestor of
%s
"
%
(
sha1_git
,
root_sha1_git
))
def
test_lookup_directory_with_revision_not_found
():
unknown_revision_
=
random_sha1
()
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_directory_with_revision
(
unknown_revision_
)
assert
e
.
match
(
"Revision
%s
not found"
%
unknown_revision_
)
@given
(
new_revision
())
def
test_lookup_directory_with_revision_unknown_content
(
archive_data
,
new_revision
):
unknown_content_
=
random_content
()
dir_path
=
"README.md"
# A directory that points to unknown content
dir
=
Directory
(
entries
=
(
DirectoryEntry
(
name
=
bytes
(
dir_path
.
encode
(
"utf-8"
)),
type
=
"file"
,
target
=
hash_to_bytes
(
unknown_content_
[
"sha1_git"
]),
perms
=
DentryPerms
.
content
,
),
)
)
# Create a revision that points to a directory
# Which points to unknown content
new_revision
=
new_revision
.
to_dict
()
new_revision
[
"directory"
]
=
dir
.
id
del
new_revision
[
"id"
]
new_revision
=
Revision
.
from_dict
(
new_revision
)
# Add the directory and revision in mem
archive_data
.
directory_add
([
dir
])
archive_data
.
revision_add
([
new_revision
])
new_revision_id
=
hash_to_hex
(
new_revision
.
id
)
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_directory_with_revision
(
new_revision_id
,
dir_path
)
assert
e
.
match
(
"Content not found for revision
%s
"
%
new_revision_id
)
def
test_lookup_directory_with_revision_ko_path_to_nowhere
(
revision
):
invalid_path
=
"path/to/something/unknown"
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_directory_with_revision
(
revision
,
invalid_path
)
assert
e
.
match
(
"Directory or File"
)
assert
e
.
match
(
invalid_path
)
assert
e
.
match
(
"revision
%s
"
%
revision
)
assert
e
.
match
(
"not found"
)
def
test_lookup_directory_with_revision_submodules
(
archive_data
,
revision_with_submodules
):
rev_sha1_git
=
revision_with_submodules
[
"rev_sha1_git"
]
rev_dir_path
=
revision_with_submodules
[
"rev_dir_rev_path"
]
actual_data
=
archive
.
lookup_directory_with_revision
(
rev_sha1_git
,
rev_dir_path
)
revision
=
archive_data
.
revision_get
(
revision_with_submodules
[
"rev_sha1_git"
])
directory
=
archive_data
.
directory_ls
(
revision
[
"directory"
])
rev_entry
=
next
(
e
for
e
in
directory
if
e
[
"name"
]
==
rev_dir_path
)
expected_data
=
{
"content"
:
archive_data
.
revision_get
(
rev_entry
[
"target"
]),
"path"
:
rev_dir_path
,
"revision"
:
rev_sha1_git
,
"type"
:
"rev"
,
}
assert
actual_data
==
expected_data
def
test_lookup_directory_with_revision_without_path
(
archive_data
,
revision
):
actual_directory_entries
=
archive
.
lookup_directory_with_revision
(
revision
)
revision_data
=
archive_data
.
revision_get
(
revision
)
expected_directory_entries
=
archive_data
.
directory_ls
(
revision_data
[
"directory"
])
assert
actual_directory_entries
[
"type"
]
==
"dir"
assert
actual_directory_entries
[
"content"
]
==
expected_directory_entries
def
test_lookup_directory_with_revision_with_path
(
archive_data
,
revision
):
rev_data
=
archive_data
.
revision_get
(
revision
)
dir_entries
=
[
e
for
e
in
archive_data
.
directory_ls
(
rev_data
[
"directory"
])
if
e
[
"type"
]
in
(
"file"
,
"dir"
)
]
expected_dir_entry
=
random
.
choice
(
dir_entries
)
actual_dir_entry
=
archive
.
lookup_directory_with_revision
(
revision
,
expected_dir_entry
[
"name"
]
)
assert
actual_dir_entry
[
"type"
]
==
expected_dir_entry
[
"type"
]
assert
actual_dir_entry
[
"revision"
]
==
revision
assert
actual_dir_entry
[
"path"
]
==
expected_dir_entry
[
"name"
]
if
actual_dir_entry
[
"type"
]
==
"file"
:
del
actual_dir_entry
[
"content"
][
"checksums"
][
"blake2s256"
]
for
key
in
(
"checksums"
,
"status"
,
"length"
):
assert
actual_dir_entry
[
"content"
][
key
]
==
expected_dir_entry
[
key
]
else
:
sub_dir_entries
=
archive_data
.
directory_ls
(
expected_dir_entry
[
"target"
])
assert
actual_dir_entry
[
"content"
]
==
sub_dir_entries
def
test_lookup_directory_with_revision_with_path_to_file_and_data
(
archive_data
,
revision
):
rev_data
=
archive_data
.
revision_get
(
revision
)
dir_entries
=
[
e
for
e
in
archive_data
.
directory_ls
(
rev_data
[
"directory"
])
if
e
[
"type"
]
==
"file"
]
expected_dir_entry
=
random
.
choice
(
dir_entries
)
expected_data
=
archive_data
.
content_get_data
(
expected_dir_entry
[
"checksums"
][
"sha1"
]
)
actual_dir_entry
=
archive
.
lookup_directory_with_revision
(
revision
,
expected_dir_entry
[
"name"
],
with_data
=
True
)
assert
actual_dir_entry
[
"type"
]
==
expected_dir_entry
[
"type"
]
assert
actual_dir_entry
[
"revision"
]
==
revision
assert
actual_dir_entry
[
"path"
]
==
expected_dir_entry
[
"name"
]
del
actual_dir_entry
[
"content"
][
"checksums"
][
"blake2s256"
]
for
key
in
(
"checksums"
,
"status"
,
"length"
):
assert
actual_dir_entry
[
"content"
][
key
]
==
expected_dir_entry
[
key
]
assert
actual_dir_entry
[
"content"
][
"data"
]
==
expected_data
[
"data"
]
def
test_lookup_revision
(
archive_data
,
revision
):
actual_revision
=
archive
.
lookup_revision
(
revision
)
assert
actual_revision
==
archive_data
.
revision_get
(
revision
)
@given
(
new_revision
())
def
test_lookup_revision_invalid_msg
(
archive_data
,
new_revision
):
new_revision
=
new_revision
.
to_dict
()
new_revision
[
"message"
]
=
b
"elegant fix for bug
\xff
"
archive_data
.
revision_add
([
Revision
.
from_dict
(
new_revision
)])
revision
=
archive
.
lookup_revision
(
hash_to_hex
(
new_revision
[
"id"
]))
assert
revision
[
"message"
]
==
"elegant fix for bug
\\
xff"
assert
"message"
in
revision
[
"decoding_failures"
]
@given
(
new_revision
())
def
test_lookup_revision_msg_ok
(
archive_data
,
new_revision
):
archive_data
.
revision_add
([
new_revision
])
revision_message
=
archive
.
lookup_revision_message
(
hash_to_hex
(
new_revision
.
id
))
assert
revision_message
==
{
"message"
:
new_revision
.
message
}
def
test_lookup_revision_msg_no_rev
():
unknown_revision_
=
random_sha1
()
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_revision_message
(
unknown_revision_
)
assert
e
.
match
(
"Revision with sha1_git
%s
not found."
%
unknown_revision_
)
def
test_lookup_revision_multiple
(
archive_data
,
revisions
):
actual_revisions
=
list
(
archive
.
lookup_revision_multiple
(
revisions
))
expected_revisions
=
[]
for
rev
in
revisions
:
expected_revisions
.
append
(
archive_data
.
revision_get
(
rev
))
assert
actual_revisions
==
expected_revisions
def
test_lookup_revision_multiple_none_found
():
unknown_revisions_
=
[
random_sha1
(),
random_sha1
(),
random_sha1
()]
actual_revisions
=
list
(
archive
.
lookup_revision_multiple
(
unknown_revisions_
))
assert
actual_revisions
==
[
None
]
*
len
(
unknown_revisions_
)
def
test_lookup_revision_log
(
archive_data
,
revision
):
actual_revision_log
=
list
(
archive
.
lookup_revision_log
(
revision
,
limit
=
25
))
expected_revision_log
=
archive_data
.
revision_log
(
revision
,
limit
=
25
)
assert
actual_revision_log
==
expected_revision_log
def
_get_origin_branches
(
archive_data
,
origin
):
origin_visit
=
archive_data
.
origin_visit_get
(
origin
[
"url"
])[
-
1
]
snapshot
=
archive_data
.
snapshot_get
(
origin_visit
[
"snapshot"
])
branches
=
{
k
:
v
for
(
k
,
v
)
in
snapshot
[
"branches"
]
.
items
()
if
v
[
"target_type"
]
==
"revision"
}
return
branches
def
test_lookup_revision_log_by
(
archive_data
,
origin
):
branches
=
_get_origin_branches
(
archive_data
,
origin
)
branch_name
=
random
.
choice
(
list
(
branches
.
keys
()))
actual_log
=
list
(
archive
.
lookup_revision_log_by
(
origin
[
"url"
],
branch_name
,
None
,
limit
=
25
)
)
expected_log
=
archive_data
.
revision_log
(
branches
[
branch_name
][
"target"
],
limit
=
25
)
assert
actual_log
==
expected_log
def
test_lookup_revision_log_by_notfound
(
origin
):
with
pytest
.
raises
(
NotFoundExc
):
archive
.
lookup_revision_log_by
(
origin
[
"url"
],
"unknown_branch_name"
,
None
,
limit
=
100
)
def
test_lookup_content_raw_not_found
():
unknown_content_
=
random_content
()
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_content_raw
(
"sha1:"
+
unknown_content_
[
"sha1"
])
assert
e
.
match
(
"Content with
%s
checksum equals to
%s
not found!"
%
(
"sha1"
,
unknown_content_
[
"sha1"
])
)
def
test_lookup_content_raw
(
archive_data
,
content
):
actual_content
=
archive
.
lookup_content_raw
(
"sha256:
%s
"
%
content
[
"sha256"
])
expected_content
=
archive_data
.
content_get_data
(
content
[
"sha1"
])
assert
actual_content
==
expected_content
def
test_lookup_empty_content_raw
(
empty_content
):
content_raw
=
archive
.
lookup_content_raw
(
f
"sha1_git:{empty_content['sha1_git']}"
)
assert
content_raw
[
"data"
]
==
b
""
def
test_lookup_content_not_found
():
unknown_content_
=
random_content
()
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_content
(
"sha1:
%s
"
%
unknown_content_
[
"sha1"
])
assert
e
.
match
(
"Content with
%s
checksum equals to
%s
not found!"
%
(
"sha1"
,
unknown_content_
[
"sha1"
])
)
def
test_lookup_content_with_sha1
(
archive_data
,
content
):
actual_content
=
archive
.
lookup_content
(
f
"sha1:{content['sha1']}"
)
expected_content
=
archive_data
.
content_get
(
content
[
"sha1"
])
assert
actual_content
==
expected_content
def
test_lookup_content_with_sha256
(
archive_data
,
content
):
actual_content
=
archive
.
lookup_content
(
f
"sha256:{content['sha256']}"
)
expected_content
=
archive_data
.
content_get
(
content
[
"sha1"
])
assert
actual_content
==
expected_content
def
test_lookup_directory_bad_checksum
():
with
pytest
.
raises
(
BadInputExc
):
archive
.
lookup_directory
(
"directory_id"
)
def
test_lookup_directory_not_found
():
unknown_directory_
=
random_sha1
()
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_directory
(
unknown_directory_
)
assert
e
.
match
(
"Directory with sha1_git
%s
not found"
%
unknown_directory_
)
def
test_lookup_directory
(
archive_data
,
directory
):
actual_directory_ls
=
list
(
archive
.
lookup_directory
(
directory
))
expected_directory_ls
=
archive_data
.
directory_ls
(
directory
)
assert
actual_directory_ls
==
expected_directory_ls
def
test_lookup_directory_empty
(
empty_directory
):
actual_directory_ls
=
list
(
archive
.
lookup_directory
(
empty_directory
))
assert
actual_directory_ls
==
[]
def
test_lookup_revision_by_nothing_found
(
origin
):
with
pytest
.
raises
(
NotFoundExc
):
archive
.
lookup_revision_by
(
origin
[
"url"
],
"invalid-branch-name"
)
def
test_lookup_revision_by
(
archive_data
,
origin
):
branches
=
_get_origin_branches
(
archive_data
,
origin
)
branch_name
=
random
.
choice
(
list
(
branches
.
keys
()))
actual_revision
=
archive
.
lookup_revision_by
(
origin
[
"url"
],
branch_name
)
expected_revision
=
archive_data
.
revision_get
(
branches
[
branch_name
][
"target"
])
assert
actual_revision
==
expected_revision
def
test_lookup_revision_with_context_by_ko
(
origin
,
revision
):
with
pytest
.
raises
(
NotFoundExc
):
archive
.
lookup_revision_with_context_by
(
origin
[
"url"
],
"invalid-branch-name"
,
None
,
revision
)
def
test_lookup_revision_with_context_by
(
archive_data
,
origin
):
branches
=
_get_origin_branches
(
archive_data
,
origin
)
branch_name
=
random
.
choice
(
list
(
branches
.
keys
()))
root_rev
=
branches
[
branch_name
][
"target"
]
root_rev_log
=
archive_data
.
revision_log
(
root_rev
)
children
=
defaultdict
(
list
)
for
rev
in
root_rev_log
:
for
rev_p
in
rev
[
"parents"
]:
children
[
rev_p
]
.
append
(
rev
[
"id"
])
rev
=
root_rev_log
[
-
1
][
"id"
]
actual_root_rev
,
actual_rev
=
archive
.
lookup_revision_with_context_by
(
origin
[
"url"
],
branch_name
,
None
,
rev
)
expected_root_rev
=
archive_data
.
revision_get
(
root_rev
)
expected_rev
=
archive_data
.
revision_get
(
rev
)
expected_rev
[
"children"
]
=
children
[
rev
]
assert
actual_root_rev
==
expected_root_rev
assert
actual_rev
==
expected_rev
def
test_lookup_revision_through_ko_not_implemented
():
with
pytest
.
raises
(
NotImplementedError
):
archive
.
lookup_revision_through
({
"something-unknown"
:
10
})
def
test_lookup_revision_through_with_context_by
(
archive_data
,
origin
):
branches
=
_get_origin_branches
(
archive_data
,
origin
)
branch_name
=
random
.
choice
(
list
(
branches
.
keys
()))
root_rev
=
branches
[
branch_name
][
"target"
]
root_rev_log
=
archive_data
.
revision_log
(
root_rev
)
rev
=
root_rev_log
[
-
1
][
"id"
]
assert
archive
.
lookup_revision_through
(
{
"origin_url"
:
origin
[
"url"
],
"branch_name"
:
branch_name
,
"ts"
:
None
,
"sha1_git"
:
rev
,
}
)
==
archive
.
lookup_revision_with_context_by
(
origin
[
"url"
],
branch_name
,
None
,
rev
)
def
test_lookup_revision_through_with_revision_by
(
archive_data
,
origin
):
branches
=
_get_origin_branches
(
archive_data
,
origin
)
branch_name
=
random
.
choice
(
list
(
branches
.
keys
()))
assert
archive
.
lookup_revision_through
(
{
"origin_url"
:
origin
[
"url"
],
"branch_name"
:
branch_name
,
"ts"
:
None
,
}
)
==
archive
.
lookup_revision_by
(
origin
[
"url"
],
branch_name
,
None
)
def
test_lookup_revision_through_with_context
(
ancestor_revisions
):
sha1_git
=
ancestor_revisions
[
"sha1_git"
]
sha1_git_root
=
ancestor_revisions
[
"sha1_git_root"
]
assert
archive
.
lookup_revision_through
(
{
"sha1_git_root"
:
sha1_git_root
,
"sha1_git"
:
sha1_git
,
}
)
==
archive
.
lookup_revision_with_context
(
sha1_git_root
,
sha1_git
)
def
test_lookup_revision_through_with_revision
(
revision
):
assert
archive
.
lookup_revision_through
(
{
"sha1_git"
:
revision
}
)
==
archive
.
lookup_revision
(
revision
)
def
test_lookup_directory_through_revision_ko_not_found
(
revision
):
with
pytest
.
raises
(
NotFoundExc
):
archive
.
lookup_directory_through_revision
(
{
"sha1_git"
:
revision
},
"some/invalid/path"
)
def
test_lookup_directory_through_revision_ok
(
archive_data
,
revision
):
rev_data
=
archive_data
.
revision_get
(
revision
)
dir_entries
=
[
e
for
e
in
archive_data
.
directory_ls
(
rev_data
[
"directory"
])
if
e
[
"type"
]
==
"file"
]
dir_entry
=
random
.
choice
(
dir_entries
)
assert
archive
.
lookup_directory_through_revision
(
{
"sha1_git"
:
revision
},
dir_entry
[
"name"
]
)
==
(
revision
,
archive
.
lookup_directory_with_revision
(
revision
,
dir_entry
[
"name"
]))
def
test_lookup_directory_through_revision_ok_with_data
(
archive_data
,
revision
):
rev_data
=
archive_data
.
revision_get
(
revision
)
dir_entries
=
[
e
for
e
in
archive_data
.
directory_ls
(
rev_data
[
"directory"
])
if
e
[
"type"
]
==
"file"
]
dir_entry
=
random
.
choice
(
dir_entries
)
assert
archive
.
lookup_directory_through_revision
(
{
"sha1_git"
:
revision
},
dir_entry
[
"name"
],
with_data
=
True
)
==
(
revision
,
archive
.
lookup_directory_with_revision
(
revision
,
dir_entry
[
"name"
],
with_data
=
True
),
)
def
test_lookup_known_objects
(
archive_data
,
content
,
directory
,
release
,
revision
,
snapshot
):
expected
=
archive_data
.
content_find
(
content
)
assert
archive
.
lookup_object
(
ObjectType
.
CONTENT
,
content
[
"sha1_git"
])
==
expected
expected
=
archive_data
.
directory_get
(
directory
)
assert
archive
.
lookup_object
(
ObjectType
.
DIRECTORY
,
directory
)
==
expected
expected
=
archive_data
.
release_get
(
release
)
assert
archive
.
lookup_object
(
ObjectType
.
RELEASE
,
release
)
==
expected
expected
=
archive_data
.
revision_get
(
revision
)
assert
archive
.
lookup_object
(
ObjectType
.
REVISION
,
revision
)
==
expected
expected
=
{
**
archive_data
.
snapshot_get
(
snapshot
),
"next_branch"
:
None
}
assert
archive
.
lookup_object
(
ObjectType
.
SNAPSHOT
,
snapshot
)
==
expected
def
test_lookup_unknown_objects
(
unknown_content
,
unknown_directory
,
unknown_release
,
unknown_revision
,
unknown_snapshot
,
):
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
CONTENT
,
unknown_content
[
"sha1_git"
])
assert
e
.
match
(
r"Content.*not found"
)
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
DIRECTORY
,
unknown_directory
)
assert
e
.
match
(
r"Directory.*not found"
)
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
RELEASE
,
unknown_release
)
assert
e
.
match
(
r"Release.*not found"
)
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
REVISION
,
unknown_revision
)
assert
e
.
match
(
r"Revision.*not found"
)
with
pytest
.
raises
(
NotFoundExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
SNAPSHOT
,
unknown_snapshot
)
assert
e
.
match
(
r"Snapshot.*not found"
)
def
test_lookup_invalid_objects
(
invalid_sha1
):
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
CONTENT
,
invalid_sha1
)
assert
e
.
match
(
"Invalid hash"
)
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
DIRECTORY
,
invalid_sha1
)
assert
e
.
match
(
"Invalid checksum"
)
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
RELEASE
,
invalid_sha1
)
assert
e
.
match
(
"Invalid checksum"
)
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
REVISION
,
invalid_sha1
)
assert
e
.
match
(
"Invalid checksum"
)
with
pytest
.
raises
(
BadInputExc
)
as
e
:
archive
.
lookup_object
(
ObjectType
.
SNAPSHOT
,
invalid_sha1
)
assert
e
.
match
(
"Invalid checksum"
)
def
test_lookup_missing_hashes_non_present
():
missing_cnt
=
random_sha1
()
missing_dir
=
random_sha1
()
missing_rev
=
random_sha1
()
missing_rel
=
random_sha1
()
missing_snp
=
random_sha1
()
grouped_swhids
=
{
ObjectType
.
CONTENT
:
[
hash_to_bytes
(
missing_cnt
)],
ObjectType
.
DIRECTORY
:
[
hash_to_bytes
(
missing_dir
)],
ObjectType
.
REVISION
:
[
hash_to_bytes
(
missing_rev
)],
ObjectType
.
RELEASE
:
[
hash_to_bytes
(
missing_rel
)],
ObjectType
.
SNAPSHOT
:
[
hash_to_bytes
(
missing_snp
)],
}
actual_result
=
archive
.
lookup_missing_hashes
(
grouped_swhids
)
assert
actual_result
==
{
missing_cnt
,
missing_dir
,
missing_rev
,
missing_rel
,
missing_snp
,
}
def
test_lookup_missing_hashes_some_present
(
content
,
directory
):
missing_rev
=
random_sha1
()
missing_rel
=
random_sha1
()
missing_snp
=
random_sha1
()
grouped_swhids
=
{
ObjectType
.
CONTENT
:
[
hash_to_bytes
(
content
[
"sha1_git"
])],
ObjectType
.
DIRECTORY
:
[
hash_to_bytes
(
directory
)],
ObjectType
.
REVISION
:
[
hash_to_bytes
(
missing_rev
)],
ObjectType
.
RELEASE
:
[
hash_to_bytes
(
missing_rel
)],
ObjectType
.
SNAPSHOT
:
[
hash_to_bytes
(
missing_snp
)],
}
actual_result
=
archive
.
lookup_missing_hashes
(
grouped_swhids
)
assert
actual_result
==
{
missing_rev
,
missing_rel
,
missing_snp
}
def
test_lookup_origin_extra_trailing_slash
(
origin
):
origin_info
=
archive
.
lookup_origin
({
"url"
:
f
"{origin['url']}/"
})
assert
origin_info
[
"url"
]
==
origin
[
"url"
]
def
test_lookup_origin_missing_trailing_slash
(
archive_data
):
deb_origin
=
Origin
(
url
=
"http://snapshot.debian.org/package/r-base/"
)
archive_data
.
origin_add
([
deb_origin
])
origin_info
=
archive
.
lookup_origin
({
"url"
:
deb_origin
.
url
[:
-
1
]})
assert
origin_info
[
"url"
]
==
deb_origin
.
url
def
test_lookup_origin_single_slash_after_protocol
(
archive_data
):
origin_url
=
"http://snapshot.debian.org/package/r-base/"
malformed_origin_url
=
"http:/snapshot.debian.org/package/r-base/"
archive_data
.
origin_add
([
Origin
(
url
=
origin_url
)])
origin_info
=
archive
.
lookup_origin
({
"url"
:
malformed_origin_url
})
assert
origin_info
[
"url"
]
==
origin_url
@given
(
new_origin
())
def
test_lookup_origins_get_by_sha1s
(
origin
,
unknown_origin
):
hasher
=
hashlib
.
sha1
()
hasher
.
update
(
origin
[
"url"
]
.
encode
(
"utf-8"
))
origin_info
=
OriginInfo
(
url
=
origin
[
"url"
])
origin_sha1
=
hasher
.
hexdigest
()
hasher
=
hashlib
.
sha1
()
hasher
.
update
(
unknown_origin
.
url
.
encode
(
"utf-8"
))
unknown_origin_sha1
=
hasher
.
hexdigest
()
origins
=
list
(
archive
.
lookup_origins_by_sha1s
([
origin_sha1
]))
assert
origins
==
[
origin_info
]
origins
=
list
(
archive
.
lookup_origins_by_sha1s
([
origin_sha1
,
origin_sha1
]))
assert
origins
==
[
origin_info
,
origin_info
]
origins
=
list
(
archive
.
lookup_origins_by_sha1s
([
origin_sha1
,
unknown_origin_sha1
]))
assert
origins
==
[
origin_info
,
None
]
def
test_search_origin
(
origin
):
results
=
archive
.
search_origin
(
url_pattern
=
origin
[
"url"
])[
0
]
assert
results
==
[{
"url"
:
origin
[
"url"
]}]
def
test_search_origin_use_ql
(
mocker
,
origin
):
ORIGIN
=
[{
"url"
:
origin
[
"url"
]}]
mock_archive_search
=
mocker
.
patch
(
"swh.web.utils.archive.search"
)
mock_archive_search
.
origin_search
.
return_value
=
PagedResult
(
results
=
ORIGIN
,
next_page_token
=
None
,
)
query
=
f
"origin = '{origin['url']}'"
results
=
archive
.
search_origin
(
url_pattern
=
query
,
use_ql
=
True
)[
0
]
assert
results
==
ORIGIN
mock_archive_search
.
origin_search
.
assert_called_with
(
query
=
query
,
page_token
=
None
,
with_visit
=
False
,
visit_types
=
None
,
limit
=
50
)
def
test_lookup_snapshot_sizes
(
archive_data
,
snapshot
):
branches
=
archive_data
.
snapshot_get
(
snapshot
)[
"branches"
]
expected_sizes
=
{
"alias"
:
0
,
"branch"
:
0
,
"release"
:
0
,
"revision"
:
0
,
}
for
_
,
branch_info
in
branches
.
items
():
if
branch_info
is
not
None
:
expected_sizes
[
branch_info
[
"target_type"
]]
+=
1
if
branch_info
[
"target_type"
]
in
(
"content"
,
"directory"
,
"revision"
):
expected_sizes
[
"branch"
]
+=
1
assert
archive
.
lookup_snapshot_sizes
(
snapshot
)
==
expected_sizes
def
test_lookup_snapshot_sizes_with_filtering
(
archive_data
,
revision
):
rev_id
=
hash_to_bytes
(
revision
)
snapshot
=
Snapshot
(
branches
=
{
b
"refs/heads/master"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
b
"refs/heads/incoming"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
b
"refs/pull/1"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
b
"refs/pull/2"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
},
)
archive_data
.
snapshot_add
([
snapshot
])
expected_sizes
=
{
"alias"
:
0
,
"branch"
:
2
,
"release"
:
0
,
"revision"
:
2
}
assert
(
archive
.
lookup_snapshot_sizes
(
snapshot
.
id
.
hex
(),
branch_name_exclude_prefix
=
"refs/pull/"
)
==
expected_sizes
)
def
test_lookup_snapshot_alias
(
snapshot
):
resolved_alias
=
archive
.
lookup_snapshot_alias
(
snapshot
,
"HEAD"
)
assert
resolved_alias
is
not
None
assert
resolved_alias
[
"target_type"
]
==
"revision"
assert
resolved_alias
[
"target"
]
is
not
None
def
test_lookup_snapshot_missing
(
revision
):
with
pytest
.
raises
(
NotFoundExc
):
archive
.
lookup_snapshot
(
revision
)
def
test_lookup_snapshot_empty_branch_list
(
archive_data
,
revision
):
rev_id
=
hash_to_bytes
(
revision
)
snapshot
=
Snapshot
(
branches
=
{
b
"refs/heads/master"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
},
)
archive_data
.
snapshot_add
([
snapshot
])
# FIXME; This test will change once the inconsistency in storage is fixed
# postgres backend returns None in case of a missing branch whereas the
# in-memory implementation (used in tests) returns a data structure;
# hence the inconsistency
branches
=
archive
.
lookup_snapshot
(
hash_to_hex
(
snapshot
.
id
),
branch_name_include_substring
=
"non-existing"
,
)[
"branches"
]
assert
not
branches
def
test_lookup_snapshot_branch_names_filtering
(
archive_data
,
revision
):
rev_id
=
hash_to_bytes
(
revision
)
snapshot
=
Snapshot
(
branches
=
{
b
"refs/heads/master"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
b
"refs/heads/incoming"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
b
"refs/pull/1"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
b
"refs/pull/2"
:
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
"non_ascii_name_é"
.
encode
():
SnapshotBranch
(
target
=
rev_id
,
target_type
=
TargetType
.
REVISION
,
),
},
)
archive_data
.
snapshot_add
([
snapshot
])
for
include_pattern
,
exclude_prefix
,
nb_results
in
(
(
"pull"
,
None
,
2
),
(
"incoming"
,
None
,
1
),
(
"é"
,
None
,
1
),
(
None
,
"refs/heads/"
,
3
),
(
"refs"
,
"refs/heads/master"
,
3
),
):
branches
=
archive
.
lookup_snapshot
(
hash_to_hex
(
snapshot
.
id
),
branch_name_include_substring
=
include_pattern
,
branch_name_exclude_prefix
=
exclude_prefix
,
)[
"branches"
]
assert
len
(
branches
)
==
nb_results
for
branch_name
in
branches
:
if
include_pattern
:
assert
include_pattern
in
branch_name
if
exclude_prefix
:
assert
not
branch_name
.
startswith
(
exclude_prefix
)
def
test_lookup_snapshot_branch_names_filtering_paginated
(
archive_data
,
directory
,
revision
):
pattern
=
"foo"
nb_branches_by_target_type
=
10
branches
=
{}
for
i
in
range
(
nb_branches_by_target_type
):
branches
[
f
"branch/directory/bar{i}"
.
encode
()]
=
SnapshotBranch
(
target
=
hash_to_bytes
(
directory
),
target_type
=
TargetType
.
DIRECTORY
,
)
branches
[
f
"branch/revision/bar{i}"
.
encode
()]
=
SnapshotBranch
(
target
=
hash_to_bytes
(
revision
),
target_type
=
TargetType
.
REVISION
,
)
branches
[
f
"branch/directory/{pattern}{i}"
.
encode
()]
=
SnapshotBranch
(
target
=
hash_to_bytes
(
directory
),
target_type
=
TargetType
.
DIRECTORY
,
)
branches
[
f
"branch/revision/{pattern}{i}"
.
encode
()]
=
SnapshotBranch
(
target
=
hash_to_bytes
(
revision
),
target_type
=
TargetType
.
REVISION
,
)
snapshot
=
Snapshot
(
branches
=
branches
)
archive_data
.
snapshot_add
([
snapshot
])
branches_count
=
nb_branches_by_target_type
//
2
for
target_type
in
(
ObjectType
.
DIRECTORY
.
name
.
lower
(),
ObjectType
.
REVISION
.
name
.
lower
(),
):
partial_branches
=
archive
.
lookup_snapshot
(
hash_to_hex
(
snapshot
.
id
),
target_types
=
[
target_type
],
branches_count
=
branches_count
,
branch_name_include_substring
=
pattern
,
)
branches
=
partial_branches
[
"branches"
]
assert
len
(
branches
)
==
branches_count
for
branch_name
,
branch_data
in
branches
.
items
():
assert
pattern
in
branch_name
assert
branch_data
[
"target_type"
]
==
target_type
for
i
in
range
(
branches_count
):
assert
f
"branch/{target_type}/{pattern}{i}"
in
branches
assert
(
partial_branches
[
"next_branch"
]
==
f
"branch/{target_type}/{pattern}{branches_count}"
)
partial_branches
=
archive
.
lookup_snapshot
(
hash_to_hex
(
snapshot
.
id
),
target_types
=
[
target_type
],
branches_from
=
partial_branches
[
"next_branch"
],
branch_name_include_substring
=
pattern
,
)
branches
=
partial_branches
[
"branches"
]
assert
len
(
branches
)
==
branches_count
for
branch_name
,
branch_data
in
branches
.
items
():
assert
pattern
in
branch_name
assert
branch_data
[
"target_type"
]
==
target_type
for
i
in
range
(
branches_count
,
2
*
branches_count
):
assert
f
"branch/{target_type}/{pattern}{i}"
in
branches
assert
partial_branches
[
"next_branch"
]
is
None
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Jun 3, 7:39 AM (4 d, 7 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3238565
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment