Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8393056
storage_tests.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
142 KB
Subscribers
None
storage_tests.py
View Options
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
collections
import
defaultdict
import
datetime
from
datetime
import
timedelta
import
inspect
import
itertools
import
math
import
random
from
typing
import
Any
,
ClassVar
,
Dict
,
Iterator
,
Optional
import
attr
from
hypothesis
import
HealthCheck
,
given
,
settings
,
strategies
import
pytest
from
swh.model
import
from_disk
from
swh.model.hashutil
import
hash_to_bytes
from
swh.model.hypothesis_strategies
import
objects
from
swh.model.identifiers
import
SWHID
from
swh.model.model
import
(
Content
,
Directory
,
MetadataTargetType
,
Origin
,
OriginVisit
,
OriginVisitStatus
,
Person
,
Revision
,
SkippedContent
,
Snapshot
,
TargetType
,
)
from
swh.storage
import
get_storage
from
swh.storage.common
import
origin_url_to_sha1
as
sha1
from
swh.storage.exc
import
HashCollision
,
StorageArgumentException
from
swh.storage.interface
import
ListOrder
,
PagedResult
,
StorageInterface
from
swh.storage.utils
import
content_hex_hashes
,
now
,
round_to_milliseconds
def
transform_entries
(
storage
:
StorageInterface
,
dir_
:
Directory
,
*
,
prefix
:
bytes
=
b
""
)
->
Iterator
[
Dict
[
str
,
Any
]]:
"""Iterate through a directory's entries, and yields the items 'directory_ls' is
expected to return; including content metadata for file entries."""
for
ent
in
dir_
.
entries
:
if
ent
.
type
==
"dir"
:
yield
{
"dir_id"
:
dir_
.
id
,
"type"
:
ent
.
type
,
"target"
:
ent
.
target
,
"name"
:
prefix
+
ent
.
name
,
"perms"
:
ent
.
perms
,
"status"
:
None
,
"sha1"
:
None
,
"sha1_git"
:
None
,
"sha256"
:
None
,
"length"
:
None
,
}
elif
ent
.
type
==
"file"
:
contents
=
storage
.
content_find
({
"sha1_git"
:
ent
.
target
})
assert
contents
ent_dict
=
contents
[
0
]
.
to_dict
()
for
key
in
[
"ctime"
,
"blake2s256"
]:
ent_dict
.
pop
(
key
,
None
)
ent_dict
.
update
(
{
"dir_id"
:
dir_
.
id
,
"type"
:
ent
.
type
,
"target"
:
ent
.
target
,
"name"
:
prefix
+
ent
.
name
,
"perms"
:
ent
.
perms
,
}
)
yield
ent_dict
def
assert_contents_ok
(
expected_contents
,
actual_contents
,
keys_to_check
=
{
"sha1"
,
"data"
}
):
"""Assert that a given list of contents matches on a given set of keys.
"""
for
k
in
keys_to_check
:
expected_list
=
set
([
c
.
get
(
k
)
for
c
in
expected_contents
])
actual_list
=
set
([
c
.
get
(
k
)
for
c
in
actual_contents
])
assert
actual_list
==
expected_list
,
k
class
LazyContent
(
Content
):
def
with_data
(
self
):
return
Content
.
from_dict
({
**
self
.
to_dict
(),
"data"
:
b
"42
\n
"
})
class
TestStorage
:
"""Main class for Storage testing.
This class is used as-is to test local storage (see TestLocalStorage
below) and remote storage (see TestRemoteStorage in
test_remote_storage.py.
We need to have the two classes inherit from this base class
separately to avoid nosetests running the tests from the base
class twice.
"""
maxDiff
=
None
# type: ClassVar[Optional[int]]
def
test_types
(
self
,
swh_storage_backend_config
):
"""Checks all methods of StorageInterface are implemented by this
backend, and that they have the same signature."""
# Create an instance of the protocol (which cannot be instantiated
# directly, so this creates a subclass, then instantiates it)
interface
=
type
(
"_"
,
(
StorageInterface
,),
{})()
storage
=
get_storage
(
**
swh_storage_backend_config
)
assert
"content_add"
in
dir
(
interface
)
missing_methods
=
[]
for
meth_name
in
dir
(
interface
):
if
meth_name
.
startswith
(
"_"
):
continue
interface_meth
=
getattr
(
interface
,
meth_name
)
try
:
concrete_meth
=
getattr
(
storage
,
meth_name
)
except
AttributeError
:
if
not
getattr
(
interface_meth
,
"deprecated_endpoint"
,
False
):
# The backend is missing a (non-deprecated) endpoint
missing_methods
.
append
(
meth_name
)
continue
expected_signature
=
inspect
.
signature
(
interface_meth
)
actual_signature
=
inspect
.
signature
(
concrete_meth
)
assert
expected_signature
==
actual_signature
,
meth_name
assert
missing_methods
==
[]
# If all the assertions above succeed, then this one should too.
# But there's no harm in double-checking.
# And we could replace the assertions above by this one, but unlike
# the assertions above, it doesn't explain what is missing.
assert
isinstance
(
storage
,
StorageInterface
)
def
test_check_config
(
self
,
swh_storage
):
assert
swh_storage
.
check_config
(
check_write
=
True
)
assert
swh_storage
.
check_config
(
check_write
=
False
)
def
test_content_add
(
self
,
swh_storage
,
sample_data
):
cont
=
sample_data
.
content
insertion_start_time
=
now
()
actual_result
=
swh_storage
.
content_add
([
cont
])
insertion_end_time
=
now
()
assert
actual_result
==
{
"content:add"
:
1
,
"content:add:bytes"
:
cont
.
length
,
}
assert
swh_storage
.
content_get_data
(
cont
.
sha1
)
==
cont
.
data
expected_cont
=
attr
.
evolve
(
cont
,
data
=
None
)
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
for
obj
in
contents
:
assert
insertion_start_time
<=
obj
.
ctime
assert
obj
.
ctime
<=
insertion_end_time
assert
obj
==
expected_cont
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"content"
]
==
1
def
test_content_add_from_lazy_content
(
self
,
swh_storage
,
sample_data
):
cont
=
sample_data
.
content
lazy_content
=
LazyContent
.
from_dict
(
cont
.
to_dict
())
insertion_start_time
=
now
()
actual_result
=
swh_storage
.
content_add
([
lazy_content
])
insertion_end_time
=
now
()
assert
actual_result
==
{
"content:add"
:
1
,
"content:add:bytes"
:
cont
.
length
,
}
# the fact that we retrieve the content object from the storage with
# the correct 'data' field ensures it has been 'called'
assert
swh_storage
.
content_get_data
(
cont
.
sha1
)
==
cont
.
data
expected_cont
=
attr
.
evolve
(
lazy_content
,
data
=
None
,
ctime
=
None
)
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
for
obj
in
contents
:
assert
insertion_start_time
<=
obj
.
ctime
assert
obj
.
ctime
<=
insertion_end_time
assert
attr
.
evolve
(
obj
,
ctime
=
None
)
.
to_dict
()
==
expected_cont
.
to_dict
()
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"content"
]
==
1
def
test_content_get_data_missing
(
self
,
swh_storage
,
sample_data
):
cont
,
cont2
=
sample_data
.
contents
[:
2
]
swh_storage
.
content_add
([
cont
])
# Query a single missing content
actual_content_data
=
swh_storage
.
content_get_data
(
cont2
.
sha1
)
assert
actual_content_data
is
None
# Check content_get does not abort after finding a missing content
actual_content_data
=
swh_storage
.
content_get_data
(
cont
.
sha1
)
assert
actual_content_data
==
cont
.
data
actual_content_data
=
swh_storage
.
content_get_data
(
cont2
.
sha1
)
assert
actual_content_data
is
None
def
test_content_add_different_input
(
self
,
swh_storage
,
sample_data
):
cont
,
cont2
=
sample_data
.
contents
[:
2
]
actual_result
=
swh_storage
.
content_add
([
cont
,
cont2
])
assert
actual_result
==
{
"content:add"
:
2
,
"content:add:bytes"
:
cont
.
length
+
cont2
.
length
,
}
def
test_content_add_twice
(
self
,
swh_storage
,
sample_data
):
cont
,
cont2
=
sample_data
.
contents
[:
2
]
actual_result
=
swh_storage
.
content_add
([
cont
])
assert
actual_result
==
{
"content:add"
:
1
,
"content:add:bytes"
:
cont
.
length
,
}
assert
len
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
1
actual_result
=
swh_storage
.
content_add
([
cont
,
cont2
])
assert
actual_result
==
{
"content:add"
:
1
,
"content:add:bytes"
:
cont2
.
length
,
}
assert
2
<=
len
(
swh_storage
.
journal_writer
.
journal
.
objects
)
<=
3
assert
len
(
swh_storage
.
content_find
(
cont
.
to_dict
()))
==
1
assert
len
(
swh_storage
.
content_find
(
cont2
.
to_dict
()))
==
1
def
test_content_add_collision
(
self
,
swh_storage
,
sample_data
):
cont1
=
sample_data
.
content
# create (corrupted) content with same sha1{,_git} but != sha256
sha256_array
=
bytearray
(
cont1
.
sha256
)
sha256_array
[
0
]
+=
1
cont1b
=
attr
.
evolve
(
cont1
,
sha256
=
bytes
(
sha256_array
))
with
pytest
.
raises
(
HashCollision
)
as
cm
:
swh_storage
.
content_add
([
cont1
,
cont1b
])
exc
=
cm
.
value
actual_algo
=
exc
.
algo
assert
actual_algo
in
[
"sha1"
,
"sha1_git"
]
actual_id
=
exc
.
hash_id
assert
actual_id
==
getattr
(
cont1
,
actual_algo
)
.
hex
()
collisions
=
exc
.
args
[
2
]
assert
len
(
collisions
)
==
2
assert
collisions
==
[
content_hex_hashes
(
cont1
.
hashes
()),
content_hex_hashes
(
cont1b
.
hashes
()),
]
assert
exc
.
colliding_content_hashes
()
==
[
cont1
.
hashes
(),
cont1b
.
hashes
(),
]
def
test_content_add_duplicate
(
self
,
swh_storage
,
sample_data
):
cont
=
sample_data
.
content
swh_storage
.
content_add
([
cont
,
cont
])
assert
swh_storage
.
content_get_data
(
cont
.
sha1
)
==
cont
.
data
def
test_content_update
(
self
,
swh_storage
,
sample_data
):
cont1
=
sample_data
.
content
if
hasattr
(
swh_storage
,
"journal_writer"
):
swh_storage
.
journal_writer
.
journal
=
None
# TODO, not supported
swh_storage
.
content_add
([
cont1
])
# alter the sha1_git for example
cont1b
=
attr
.
evolve
(
cont1
,
sha1_git
=
hash_to_bytes
(
"3a60a5275d0333bf13468e8b3dcab90f4046e654"
)
)
swh_storage
.
content_update
([
cont1b
.
to_dict
()],
keys
=
[
"sha1_git"
])
actual_contents
=
swh_storage
.
content_get
([
cont1
.
sha1
])
expected_content
=
attr
.
evolve
(
cont1b
,
data
=
None
)
assert
actual_contents
==
[
expected_content
]
def
test_content_add_metadata
(
self
,
swh_storage
,
sample_data
):
cont
=
attr
.
evolve
(
sample_data
.
content
,
data
=
None
,
ctime
=
now
())
actual_result
=
swh_storage
.
content_add_metadata
([
cont
])
assert
actual_result
==
{
"content:add"
:
1
,
}
expected_cont
=
cont
assert
swh_storage
.
content_get
([
cont
.
sha1
])
==
[
expected_cont
]
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
for
obj
in
contents
:
obj
=
attr
.
evolve
(
obj
,
ctime
=
None
)
assert
obj
==
cont
def
test_content_add_metadata_different_input
(
self
,
swh_storage
,
sample_data
):
contents
=
sample_data
.
contents
[:
2
]
cont
=
attr
.
evolve
(
contents
[
0
],
data
=
None
,
ctime
=
now
())
cont2
=
attr
.
evolve
(
contents
[
1
],
data
=
None
,
ctime
=
now
())
actual_result
=
swh_storage
.
content_add_metadata
([
cont
,
cont2
])
assert
actual_result
==
{
"content:add"
:
2
,
}
def
test_content_add_metadata_collision
(
self
,
swh_storage
,
sample_data
):
cont1
=
attr
.
evolve
(
sample_data
.
content
,
data
=
None
,
ctime
=
now
())
# create (corrupted) content with same sha1{,_git} but != sha256
sha1_git_array
=
bytearray
(
cont1
.
sha256
)
sha1_git_array
[
0
]
+=
1
cont1b
=
attr
.
evolve
(
cont1
,
sha256
=
bytes
(
sha1_git_array
))
with
pytest
.
raises
(
HashCollision
)
as
cm
:
swh_storage
.
content_add_metadata
([
cont1
,
cont1b
])
exc
=
cm
.
value
actual_algo
=
exc
.
algo
assert
actual_algo
in
[
"sha1"
,
"sha1_git"
,
"blake2s256"
]
actual_id
=
exc
.
hash_id
assert
actual_id
==
getattr
(
cont1
,
actual_algo
)
.
hex
()
collisions
=
exc
.
args
[
2
]
assert
len
(
collisions
)
==
2
assert
collisions
==
[
content_hex_hashes
(
cont1
.
hashes
()),
content_hex_hashes
(
cont1b
.
hashes
()),
]
assert
exc
.
colliding_content_hashes
()
==
[
cont1
.
hashes
(),
cont1b
.
hashes
(),
]
def
test_skipped_content_add
(
self
,
swh_storage
,
sample_data
):
contents
=
sample_data
.
skipped_contents
[:
2
]
cont
=
contents
[
0
]
cont2
=
attr
.
evolve
(
contents
[
1
],
blake2s256
=
None
)
contents_dict
=
[
c
.
to_dict
()
for
c
in
[
cont
,
cont2
]]
missing
=
list
(
swh_storage
.
skipped_content_missing
(
contents_dict
))
assert
missing
==
[
cont
.
hashes
(),
cont2
.
hashes
()]
actual_result
=
swh_storage
.
skipped_content_add
([
cont
,
cont
,
cont2
])
assert
2
<=
actual_result
.
pop
(
"skipped_content:add"
)
<=
3
assert
actual_result
==
{}
missing
=
list
(
swh_storage
.
skipped_content_missing
(
contents_dict
))
assert
missing
==
[]
def
test_skipped_content_add_missing_hashes
(
self
,
swh_storage
,
sample_data
):
cont
,
cont2
=
[
attr
.
evolve
(
c
,
sha1_git
=
None
)
for
c
in
sample_data
.
skipped_contents
[:
2
]
]
contents_dict
=
[
c
.
to_dict
()
for
c
in
[
cont
,
cont2
]]
missing
=
list
(
swh_storage
.
skipped_content_missing
(
contents_dict
))
assert
len
(
missing
)
==
2
actual_result
=
swh_storage
.
skipped_content_add
([
cont
,
cont
,
cont2
])
assert
2
<=
actual_result
.
pop
(
"skipped_content:add"
)
<=
3
assert
actual_result
==
{}
missing
=
list
(
swh_storage
.
skipped_content_missing
(
contents_dict
))
assert
missing
==
[]
def
test_skipped_content_missing_partial_hash
(
self
,
swh_storage
,
sample_data
):
cont
=
sample_data
.
skipped_content
cont2
=
attr
.
evolve
(
cont
,
sha1_git
=
None
)
contents_dict
=
[
c
.
to_dict
()
for
c
in
[
cont
,
cont2
]]
missing
=
list
(
swh_storage
.
skipped_content_missing
(
contents_dict
))
assert
len
(
missing
)
==
2
actual_result
=
swh_storage
.
skipped_content_add
([
cont
])
assert
actual_result
.
pop
(
"skipped_content:add"
)
==
1
assert
actual_result
==
{}
missing
=
list
(
swh_storage
.
skipped_content_missing
(
contents_dict
))
assert
missing
==
[
cont2
.
hashes
()]
@pytest.mark.property_based
@settings
(
deadline
=
None
)
# this test is very slow
@given
(
strategies
.
sets
(
elements
=
strategies
.
sampled_from
([
"sha256"
,
"sha1_git"
,
"blake2s256"
]),
min_size
=
0
,
)
)
def
test_content_missing
(
self
,
swh_storage
,
sample_data
,
algos
):
algos
|=
{
"sha1"
}
content
,
missing_content
=
[
sample_data
.
content2
,
sample_data
.
skipped_content
]
swh_storage
.
content_add
([
content
])
test_contents
=
[
content
.
to_dict
()]
missing_per_hash
=
defaultdict
(
list
)
for
i
in
range
(
256
):
test_content
=
missing_content
.
to_dict
()
for
hash
in
algos
:
test_content
[
hash
]
=
bytes
([
i
])
+
test_content
[
hash
][
1
:]
missing_per_hash
[
hash
]
.
append
(
test_content
[
hash
])
test_contents
.
append
(
test_content
)
assert
set
(
swh_storage
.
content_missing
(
test_contents
))
==
set
(
missing_per_hash
[
"sha1"
]
)
for
hash
in
algos
:
assert
set
(
swh_storage
.
content_missing
(
test_contents
,
key_hash
=
hash
)
)
==
set
(
missing_per_hash
[
hash
])
@pytest.mark.property_based
@given
(
strategies
.
sets
(
elements
=
strategies
.
sampled_from
([
"sha256"
,
"sha1_git"
,
"blake2s256"
]),
min_size
=
0
,
)
)
def
test_content_missing_unknown_algo
(
self
,
swh_storage
,
sample_data
,
algos
):
algos
|=
{
"sha1"
}
content
,
missing_content
=
[
sample_data
.
content2
,
sample_data
.
skipped_content
]
swh_storage
.
content_add
([
content
])
test_contents
=
[
content
.
to_dict
()]
missing_per_hash
=
defaultdict
(
list
)
for
i
in
range
(
16
):
test_content
=
missing_content
.
to_dict
()
for
hash
in
algos
:
test_content
[
hash
]
=
bytes
([
i
])
+
test_content
[
hash
][
1
:]
missing_per_hash
[
hash
]
.
append
(
test_content
[
hash
])
test_content
[
"nonexisting_algo"
]
=
b
"
\x00
"
test_contents
.
append
(
test_content
)
assert
set
(
swh_storage
.
content_missing
(
test_contents
))
==
set
(
missing_per_hash
[
"sha1"
]
)
for
hash
in
algos
:
assert
set
(
swh_storage
.
content_missing
(
test_contents
,
key_hash
=
hash
)
)
==
set
(
missing_per_hash
[
hash
])
def
test_content_missing_per_sha1
(
self
,
swh_storage
,
sample_data
):
# given
cont
=
sample_data
.
content
cont2
=
sample_data
.
content2
missing_cont
=
sample_data
.
skipped_content
missing_cont2
=
sample_data
.
skipped_content2
swh_storage
.
content_add
([
cont
,
cont2
])
# when
gen
=
swh_storage
.
content_missing_per_sha1
(
[
cont
.
sha1
,
missing_cont
.
sha1
,
cont2
.
sha1
,
missing_cont2
.
sha1
]
)
# then
assert
list
(
gen
)
==
[
missing_cont
.
sha1
,
missing_cont2
.
sha1
]
def
test_content_missing_per_sha1_git
(
self
,
swh_storage
,
sample_data
):
cont
,
cont2
=
sample_data
.
contents
[:
2
]
missing_cont
=
sample_data
.
skipped_content
missing_cont2
=
sample_data
.
skipped_content2
swh_storage
.
content_add
([
cont
,
cont2
])
contents
=
[
cont
.
sha1_git
,
cont2
.
sha1_git
,
missing_cont
.
sha1_git
,
missing_cont2
.
sha1_git
,
]
missing_contents
=
swh_storage
.
content_missing_per_sha1_git
(
contents
)
assert
list
(
missing_contents
)
==
[
missing_cont
.
sha1_git
,
missing_cont2
.
sha1_git
]
missing_contents
=
swh_storage
.
content_missing_per_sha1_git
([])
assert
list
(
missing_contents
)
==
[]
def
test_content_get_partition
(
self
,
swh_storage
,
swh_contents
):
"""content_get_partition paginates results if limit exceeded"""
expected_contents
=
[
attr
.
evolve
(
c
,
data
=
None
)
for
c
in
swh_contents
if
c
.
status
!=
"absent"
]
actual_contents
=
[]
for
i
in
range
(
16
):
actual_result
=
swh_storage
.
content_get_partition
(
i
,
16
)
assert
actual_result
.
next_page_token
is
None
actual_contents
.
extend
(
actual_result
.
results
)
assert
len
(
actual_contents
)
==
len
(
expected_contents
)
for
content
in
actual_contents
:
assert
content
in
expected_contents
assert
content
.
ctime
is
None
def
test_content_get_partition_full
(
self
,
swh_storage
,
swh_contents
):
"""content_get_partition for a single partition returns all available contents
"""
expected_contents
=
[
attr
.
evolve
(
c
,
data
=
None
)
for
c
in
swh_contents
if
c
.
status
!=
"absent"
]
actual_result
=
swh_storage
.
content_get_partition
(
0
,
1
)
assert
actual_result
.
next_page_token
is
None
actual_contents
=
actual_result
.
results
assert
len
(
actual_contents
)
==
len
(
expected_contents
)
for
content
in
actual_contents
:
assert
content
in
expected_contents
def
test_content_get_partition_empty
(
self
,
swh_storage
,
swh_contents
):
"""content_get_partition when at least one of the partitions is empty"""
expected_contents
=
{
cont
.
sha1
for
cont
in
swh_contents
if
cont
.
status
!=
"absent"
}
# nb_partitions = smallest power of 2 such that at least one of
# the partitions is empty
nb_partitions
=
1
<<
math
.
floor
(
math
.
log2
(
len
(
swh_contents
))
+
1
)
seen_sha1s
=
[]
for
i
in
range
(
nb_partitions
):
actual_result
=
swh_storage
.
content_get_partition
(
i
,
nb_partitions
,
limit
=
len
(
swh_contents
)
+
1
)
for
content
in
actual_result
.
results
:
seen_sha1s
.
append
(
content
.
sha1
)
# Limit is higher than the max number of results
assert
actual_result
.
next_page_token
is
None
assert
set
(
seen_sha1s
)
==
expected_contents
def
test_content_get_partition_limit_none
(
self
,
swh_storage
):
"""content_get_partition call with wrong limit input should fail"""
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"limit should not be None"
):
swh_storage
.
content_get_partition
(
1
,
16
,
limit
=
None
)
def
test_content_get_partition_pagination_generate
(
self
,
swh_storage
,
swh_contents
):
"""content_get_partition returns contents within range provided"""
expected_contents
=
[
attr
.
evolve
(
c
,
data
=
None
)
for
c
in
swh_contents
if
c
.
status
!=
"absent"
]
# retrieve contents
actual_contents
=
[]
for
i
in
range
(
4
):
page_token
=
None
while
True
:
actual_result
=
swh_storage
.
content_get_partition
(
i
,
4
,
limit
=
3
,
page_token
=
page_token
)
actual_contents
.
extend
(
actual_result
.
results
)
page_token
=
actual_result
.
next_page_token
if
page_token
is
None
:
break
assert
len
(
actual_contents
)
==
len
(
expected_contents
)
for
content
in
actual_contents
:
assert
content
in
expected_contents
def
test_content_get
(
self
,
swh_storage
,
sample_data
):
cont1
,
cont2
=
sample_data
.
contents
[:
2
]
swh_storage
.
content_add
([
cont1
,
cont2
])
actual_contents
=
swh_storage
.
content_get
([
cont1
.
sha1
,
cont2
.
sha1
])
# we only retrieve the metadata so no data nor ctime within
expected_contents
=
[
attr
.
evolve
(
c
,
data
=
None
)
for
c
in
[
cont1
,
cont2
]]
assert
actual_contents
==
expected_contents
for
content
in
actual_contents
:
assert
content
.
ctime
is
None
def
test_content_get_missing_sha1
(
self
,
swh_storage
,
sample_data
):
cont1
,
cont2
=
sample_data
.
contents
[:
2
]
assert
cont1
.
sha1
!=
cont2
.
sha1
missing_cont
=
sample_data
.
skipped_content
swh_storage
.
content_add
([
cont1
,
cont2
])
actual_contents
=
swh_storage
.
content_get
(
[
cont1
.
sha1
,
cont2
.
sha1
,
missing_cont
.
sha1
]
)
expected_contents
=
[
attr
.
evolve
(
c
,
data
=
None
)
if
c
else
None
for
c
in
[
cont1
,
cont2
,
None
]
]
assert
actual_contents
==
expected_contents
def
test_content_get_random
(
self
,
swh_storage
,
sample_data
):
cont
,
cont2
,
cont3
=
sample_data
.
contents
[:
3
]
swh_storage
.
content_add
([
cont
,
cont2
,
cont3
])
assert
swh_storage
.
content_get_random
()
in
{
cont
.
sha1_git
,
cont2
.
sha1_git
,
cont3
.
sha1_git
,
}
def
test_directory_add
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
directory
=
sample_data
.
directories
[
1
]
assert
directory
.
entries
[
0
]
.
target
==
content
.
sha1_git
swh_storage
.
content_add
([
content
])
init_missing
=
list
(
swh_storage
.
directory_missing
([
directory
.
id
]))
assert
[
directory
.
id
]
==
init_missing
actual_result
=
swh_storage
.
directory_add
([
directory
])
assert
actual_result
==
{
"directory:add"
:
1
}
assert
(
"directory"
,
directory
)
in
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
actual_data
=
list
(
swh_storage
.
directory_ls
(
directory
.
id
))
expected_data
=
list
(
transform_entries
(
swh_storage
,
directory
))
for
data
in
actual_data
:
assert
data
in
expected_data
after_missing
=
list
(
swh_storage
.
directory_missing
([
directory
.
id
]))
assert
after_missing
==
[]
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"directory"
]
==
1
def
test_directory_add_twice
(
self
,
swh_storage
,
sample_data
):
directory
=
sample_data
.
directories
[
1
]
actual_result
=
swh_storage
.
directory_add
([
directory
])
assert
actual_result
==
{
"directory:add"
:
1
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"directory"
,
directory
)
]
actual_result
=
swh_storage
.
directory_add
([
directory
])
assert
actual_result
==
{
"directory:add"
:
0
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"directory"
,
directory
)
]
def
test_directory_ls_recursive
(
self
,
swh_storage
,
sample_data
):
# create consistent dataset regarding the directories we want to list
content
,
content2
=
sample_data
.
contents
[:
2
]
swh_storage
.
content_add
([
content
,
content2
])
dir1
,
dir2
,
dir3
=
sample_data
.
directories
[:
3
]
dir_ids
=
[
d
.
id
for
d
in
[
dir1
,
dir2
,
dir3
]]
init_missing
=
list
(
swh_storage
.
directory_missing
(
dir_ids
))
assert
init_missing
==
dir_ids
actual_result
=
swh_storage
.
directory_add
([
dir1
,
dir2
,
dir3
])
assert
actual_result
==
{
"directory:add"
:
3
}
# List directory containing one file
actual_data
=
list
(
swh_storage
.
directory_ls
(
dir1
.
id
,
recursive
=
True
))
expected_data
=
list
(
transform_entries
(
swh_storage
,
dir1
))
for
data
in
actual_data
:
assert
data
in
expected_data
# List directory containing a file and an unknown subdirectory
actual_data
=
list
(
swh_storage
.
directory_ls
(
dir2
.
id
,
recursive
=
True
))
expected_data
=
list
(
transform_entries
(
swh_storage
,
dir2
))
for
data
in
actual_data
:
assert
data
in
expected_data
# List directory containing both a known and unknown subdirectory, entries
# should be both those of the directory and of the known subdir (up to contents)
actual_data
=
list
(
swh_storage
.
directory_ls
(
dir3
.
id
,
recursive
=
True
))
expected_data
=
list
(
itertools
.
chain
(
transform_entries
(
swh_storage
,
dir3
),
transform_entries
(
swh_storage
,
dir2
,
prefix
=
b
"subdir/"
),
)
)
for
data
in
actual_data
:
assert
data
in
expected_data
def
test_directory_ls_non_recursive
(
self
,
swh_storage
,
sample_data
):
# create consistent dataset regarding the directories we want to list
content
,
content2
=
sample_data
.
contents
[:
2
]
swh_storage
.
content_add
([
content
,
content2
])
dir1
,
dir2
,
dir3
,
_
,
dir5
=
sample_data
.
directories
[:
5
]
dir_ids
=
[
d
.
id
for
d
in
[
dir1
,
dir2
,
dir3
,
dir5
]]
init_missing
=
list
(
swh_storage
.
directory_missing
(
dir_ids
))
assert
init_missing
==
dir_ids
actual_result
=
swh_storage
.
directory_add
([
dir1
,
dir2
,
dir3
,
dir5
])
assert
actual_result
==
{
"directory:add"
:
4
}
# List directory containing a file and an unknown subdirectory
actual_data
=
list
(
swh_storage
.
directory_ls
(
dir1
.
id
))
expected_data
=
list
(
transform_entries
(
swh_storage
,
dir1
))
for
data
in
actual_data
:
assert
data
in
expected_data
# List directory containing a single file
actual_data
=
list
(
swh_storage
.
directory_ls
(
dir2
.
id
))
expected_data
=
list
(
transform_entries
(
swh_storage
,
dir2
))
for
data
in
actual_data
:
assert
data
in
expected_data
# List directory containing a known subdirectory, entries should
# only be those of the parent directory, not of the subdir
actual_data
=
list
(
swh_storage
.
directory_ls
(
dir3
.
id
))
expected_data
=
list
(
transform_entries
(
swh_storage
,
dir3
))
for
data
in
actual_data
:
assert
data
in
expected_data
def
test_directory_ls_missing_content
(
self
,
swh_storage
,
sample_data
):
swh_storage
.
directory_add
([
sample_data
.
directory2
])
assert
list
(
swh_storage
.
directory_ls
(
sample_data
.
directory2
.
id
))
==
[
{
"dir_id"
:
sample_data
.
directory2
.
id
,
"length"
:
None
,
"name"
:
b
"oof"
,
"perms"
:
33188
,
"sha1"
:
None
,
"sha1_git"
:
None
,
"sha256"
:
None
,
"status"
:
None
,
"target"
:
sample_data
.
directory2
.
entries
[
0
]
.
target
,
"type"
:
"file"
,
},
]
def
test_directory_ls_skipped_content
(
self
,
swh_storage
,
sample_data
):
swh_storage
.
directory_add
([
sample_data
.
directory2
])
cont
=
SkippedContent
(
sha1_git
=
sample_data
.
directory2
.
entries
[
0
]
.
target
,
sha1
=
b
"c"
*
20
,
sha256
=
None
,
blake2s256
=
None
,
length
=
42
,
status
=
"absent"
,
reason
=
"You need a premium subscription to access this content"
,
)
swh_storage
.
skipped_content_add
([
cont
])
assert
list
(
swh_storage
.
directory_ls
(
sample_data
.
directory2
.
id
))
==
[
{
"dir_id"
:
sample_data
.
directory2
.
id
,
"length"
:
42
,
"name"
:
b
"oof"
,
"perms"
:
33188
,
"sha1"
:
b
"c"
*
20
,
"sha1_git"
:
sample_data
.
directory2
.
entries
[
0
]
.
target
,
"sha256"
:
None
,
"status"
:
"absent"
,
"target"
:
sample_data
.
directory2
.
entries
[
0
]
.
target
,
"type"
:
"file"
,
},
]
def
test_directory_entry_get_by_path
(
self
,
swh_storage
,
sample_data
):
cont
,
content2
=
sample_data
.
contents
[:
2
]
dir1
,
dir2
,
dir3
,
dir4
,
dir5
=
sample_data
.
directories
[:
5
]
# given
dir_ids
=
[
d
.
id
for
d
in
[
dir1
,
dir2
,
dir3
,
dir4
,
dir5
]]
init_missing
=
list
(
swh_storage
.
directory_missing
(
dir_ids
))
assert
init_missing
==
dir_ids
actual_result
=
swh_storage
.
directory_add
([
dir3
,
dir4
])
assert
actual_result
==
{
"directory:add"
:
2
}
expected_entries
=
[
{
"dir_id"
:
dir3
.
id
,
"name"
:
b
"foo"
,
"type"
:
"file"
,
"target"
:
cont
.
sha1_git
,
"sha1"
:
None
,
"sha1_git"
:
None
,
"sha256"
:
None
,
"status"
:
None
,
"perms"
:
from_disk
.
DentryPerms
.
content
,
"length"
:
None
,
},
{
"dir_id"
:
dir3
.
id
,
"name"
:
b
"subdir"
,
"type"
:
"dir"
,
"target"
:
dir2
.
id
,
"sha1"
:
None
,
"sha1_git"
:
None
,
"sha256"
:
None
,
"status"
:
None
,
"perms"
:
from_disk
.
DentryPerms
.
directory
,
"length"
:
None
,
},
{
"dir_id"
:
dir3
.
id
,
"name"
:
b
"hello"
,
"type"
:
"file"
,
"target"
:
content2
.
sha1_git
,
"sha1"
:
None
,
"sha1_git"
:
None
,
"sha256"
:
None
,
"status"
:
None
,
"perms"
:
from_disk
.
DentryPerms
.
content
,
"length"
:
None
,
},
]
# when (all must be found here)
for
entry
,
expected_entry
in
zip
(
dir3
.
entries
,
expected_entries
):
actual_entry
=
swh_storage
.
directory_entry_get_by_path
(
dir3
.
id
,
[
entry
.
name
]
)
assert
actual_entry
==
expected_entry
# same, but deeper
for
entry
,
expected_entry
in
zip
(
dir3
.
entries
,
expected_entries
):
actual_entry
=
swh_storage
.
directory_entry_get_by_path
(
dir4
.
id
,
[
b
"subdir1"
,
entry
.
name
]
)
expected_entry
=
expected_entry
.
copy
()
expected_entry
[
"name"
]
=
b
"subdir1/"
+
expected_entry
[
"name"
]
assert
actual_entry
==
expected_entry
# when (nothing should be found here since `dir` is not persisted.)
for
entry
in
dir2
.
entries
:
actual_entry
=
swh_storage
.
directory_entry_get_by_path
(
dir2
.
id
,
[
entry
.
name
]
)
assert
actual_entry
is
None
def
test_directory_get_random
(
self
,
swh_storage
,
sample_data
):
dir1
,
dir2
,
dir3
=
sample_data
.
directories
[:
3
]
swh_storage
.
directory_add
([
dir1
,
dir2
,
dir3
])
assert
swh_storage
.
directory_get_random
()
in
{
dir1
.
id
,
dir2
.
id
,
dir3
.
id
,
}
def
test_revision_add
(
self
,
swh_storage
,
sample_data
):
revision
=
sample_data
.
revision
init_missing
=
swh_storage
.
revision_missing
([
revision
.
id
])
assert
list
(
init_missing
)
==
[
revision
.
id
]
actual_result
=
swh_storage
.
revision_add
([
revision
])
assert
actual_result
==
{
"revision:add"
:
1
}
end_missing
=
swh_storage
.
revision_missing
([
revision
.
id
])
assert
list
(
end_missing
)
==
[]
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"revision"
,
revision
)
]
# already there so nothing added
actual_result
=
swh_storage
.
revision_add
([
revision
])
assert
actual_result
==
{
"revision:add"
:
0
}
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"revision"
]
==
1
def
test_revision_add_twice
(
self
,
swh_storage
,
sample_data
):
revision
,
revision2
=
sample_data
.
revisions
[:
2
]
actual_result
=
swh_storage
.
revision_add
([
revision
])
assert
actual_result
==
{
"revision:add"
:
1
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"revision"
,
revision
)
]
actual_result
=
swh_storage
.
revision_add
([
revision
,
revision2
])
assert
actual_result
==
{
"revision:add"
:
1
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"revision"
,
revision
),
(
"revision"
,
revision2
),
]
def
test_revision_add_name_clash
(
self
,
swh_storage
,
sample_data
):
revision
,
revision2
=
sample_data
.
revisions
[:
2
]
revision1
=
attr
.
evolve
(
revision
,
author
=
Person
(
fullname
=
b
"John Doe <john.doe@example.com>"
,
name
=
b
"John Doe"
,
email
=
b
"john.doe@example.com"
,
),
)
revision2
=
attr
.
evolve
(
revision2
,
author
=
Person
(
fullname
=
b
"John Doe <john.doe@example.com>"
,
name
=
b
"John Doe "
,
email
=
b
"john.doe@example.com "
,
),
)
actual_result
=
swh_storage
.
revision_add
([
revision1
,
revision2
])
assert
actual_result
==
{
"revision:add"
:
2
}
def
test_revision_get_order
(
self
,
swh_storage
,
sample_data
):
revision
,
revision2
=
sample_data
.
revisions
[:
2
]
add_result
=
swh_storage
.
revision_add
([
revision
,
revision2
])
assert
add_result
==
{
"revision:add"
:
2
}
# order 1
actual_revisions
=
swh_storage
.
revision_get
([
revision
.
id
,
revision2
.
id
])
assert
actual_revisions
==
[
revision
,
revision2
]
# order 2
actual_revisions2
=
swh_storage
.
revision_get
([
revision2
.
id
,
revision
.
id
])
assert
actual_revisions2
==
[
revision2
,
revision
]
def
test_revision_log
(
self
,
swh_storage
,
sample_data
):
revision1
,
revision2
,
revision3
,
revision4
=
sample_data
.
revisions
[:
4
]
# rev4 -is-child-of-> rev3 -> rev1, (rev2 -> rev1)
swh_storage
.
revision_add
([
revision1
,
revision2
,
revision3
,
revision4
])
# when
results
=
list
(
swh_storage
.
revision_log
([
revision4
.
id
]))
# for comparison purposes
actual_results
=
[
Revision
.
from_dict
(
r
)
for
r
in
results
]
assert
len
(
actual_results
)
==
4
# rev4 -child-> rev3 -> rev1, (rev2 -> rev1)
assert
actual_results
==
[
revision4
,
revision3
,
revision1
,
revision2
]
def
test_revision_log_with_limit
(
self
,
swh_storage
,
sample_data
):
revision1
,
revision2
,
revision3
,
revision4
=
sample_data
.
revisions
[:
4
]
# revision4 -is-child-of-> revision3
swh_storage
.
revision_add
([
revision3
,
revision4
])
results
=
list
(
swh_storage
.
revision_log
([
revision4
.
id
],
1
))
actual_results
=
[
Revision
.
from_dict
(
r
)
for
r
in
results
]
assert
len
(
actual_results
)
==
1
assert
actual_results
[
0
]
==
revision4
def
test_revision_log_unknown_revision
(
self
,
swh_storage
,
sample_data
):
revision
=
sample_data
.
revision
rev_log
=
list
(
swh_storage
.
revision_log
([
revision
.
id
]))
assert
rev_log
==
[]
def
test_revision_shortlog
(
self
,
swh_storage
,
sample_data
):
revision1
,
revision2
,
revision3
,
revision4
=
sample_data
.
revisions
[:
4
]
# rev4 -is-child-of-> rev3 -> (rev1, rev2); rev2 -> rev1
swh_storage
.
revision_add
([
revision1
,
revision2
,
revision3
,
revision4
])
results
=
list
(
swh_storage
.
revision_shortlog
([
revision4
.
id
]))
actual_results
=
[[
id
,
tuple
(
parents
)]
for
(
id
,
parents
)
in
results
]
assert
len
(
actual_results
)
==
4
assert
actual_results
==
[
[
revision4
.
id
,
revision4
.
parents
],
[
revision3
.
id
,
revision3
.
parents
],
[
revision1
.
id
,
revision1
.
parents
],
[
revision2
.
id
,
revision2
.
parents
],
]
def
test_revision_shortlog_with_limit
(
self
,
swh_storage
,
sample_data
):
revision1
,
revision2
,
revision3
,
revision4
=
sample_data
.
revisions
[:
4
]
# revision4 -is-child-of-> revision3
swh_storage
.
revision_add
([
revision1
,
revision2
,
revision3
,
revision4
])
results
=
list
(
swh_storage
.
revision_shortlog
([
revision4
.
id
],
1
))
actual_results
=
[[
id
,
tuple
(
parents
)]
for
(
id
,
parents
)
in
results
]
assert
len
(
actual_results
)
==
1
assert
list
(
actual_results
[
0
])
==
[
revision4
.
id
,
revision4
.
parents
]
def
test_revision_get
(
self
,
swh_storage
,
sample_data
):
revision
,
revision2
=
sample_data
.
revisions
[:
2
]
swh_storage
.
revision_add
([
revision
])
actual_revisions
=
swh_storage
.
revision_get
([
revision
.
id
,
revision2
.
id
])
assert
len
(
actual_revisions
)
==
2
assert
actual_revisions
==
[
revision
,
None
]
def
test_revision_get_no_parents
(
self
,
swh_storage
,
sample_data
):
revision
=
sample_data
.
revision
swh_storage
.
revision_add
([
revision
])
actual_revision
=
swh_storage
.
revision_get
([
revision
.
id
])[
0
]
assert
revision
.
parents
==
()
assert
actual_revision
.
parents
==
()
# no parents on this one
def
test_revision_get_random
(
self
,
swh_storage
,
sample_data
):
revision1
,
revision2
,
revision3
=
sample_data
.
revisions
[:
3
]
swh_storage
.
revision_add
([
revision1
,
revision2
,
revision3
])
assert
swh_storage
.
revision_get_random
()
in
{
revision1
.
id
,
revision2
.
id
,
revision3
.
id
,
}
def
test_release_add
(
self
,
swh_storage
,
sample_data
):
release
,
release2
=
sample_data
.
releases
[:
2
]
init_missing
=
swh_storage
.
release_missing
([
release
.
id
,
release2
.
id
])
assert
list
(
init_missing
)
==
[
release
.
id
,
release2
.
id
]
actual_result
=
swh_storage
.
release_add
([
release
,
release2
])
assert
actual_result
==
{
"release:add"
:
2
}
end_missing
=
swh_storage
.
release_missing
([
release
.
id
,
release2
.
id
])
assert
list
(
end_missing
)
==
[]
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"release"
,
release
),
(
"release"
,
release2
),
]
# already present so nothing added
actual_result
=
swh_storage
.
release_add
([
release
,
release2
])
assert
actual_result
==
{
"release:add"
:
0
}
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"release"
]
==
2
def
test_release_add_no_author_date
(
self
,
swh_storage
,
sample_data
):
full_release
=
sample_data
.
release
release
=
attr
.
evolve
(
full_release
,
author
=
None
,
date
=
None
)
actual_result
=
swh_storage
.
release_add
([
release
])
assert
actual_result
==
{
"release:add"
:
1
}
end_missing
=
swh_storage
.
release_missing
([
release
.
id
])
assert
list
(
end_missing
)
==
[]
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"release"
,
release
)
]
def
test_release_add_twice
(
self
,
swh_storage
,
sample_data
):
release
,
release2
=
sample_data
.
releases
[:
2
]
actual_result
=
swh_storage
.
release_add
([
release
])
assert
actual_result
==
{
"release:add"
:
1
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"release"
,
release
)
]
actual_result
=
swh_storage
.
release_add
([
release
,
release2
,
release
,
release2
])
assert
actual_result
==
{
"release:add"
:
1
}
assert
set
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
set
(
[(
"release"
,
release
),
(
"release"
,
release2
),]
)
def
test_release_add_name_clash
(
self
,
swh_storage
,
sample_data
):
release
,
release2
=
[
attr
.
evolve
(
c
,
author
=
Person
(
fullname
=
b
"John Doe <john.doe@example.com>"
,
name
=
b
"John Doe"
,
email
=
b
"john.doe@example.com"
,
),
)
for
c
in
sample_data
.
releases
[:
2
]
]
actual_result
=
swh_storage
.
release_add
([
release
,
release2
])
assert
actual_result
==
{
"release:add"
:
2
}
def
test_release_get
(
self
,
swh_storage
,
sample_data
):
release
,
release2
,
release3
=
sample_data
.
releases
[:
3
]
# given
swh_storage
.
release_add
([
release
,
release2
])
# when
actual_releases
=
swh_storage
.
release_get
([
release
.
id
,
release2
.
id
])
# then
assert
actual_releases
==
[
release
,
release2
]
unknown_releases
=
swh_storage
.
release_get
([
release3
.
id
])
assert
unknown_releases
[
0
]
is
None
def
test_release_get_order
(
self
,
swh_storage
,
sample_data
):
release
,
release2
=
sample_data
.
releases
[:
2
]
add_result
=
swh_storage
.
release_add
([
release
,
release2
])
assert
add_result
==
{
"release:add"
:
2
}
# order 1
actual_releases
=
swh_storage
.
release_get
([
release
.
id
,
release2
.
id
])
assert
actual_releases
==
[
release
,
release2
]
# order 2
actual_releases2
=
swh_storage
.
release_get
([
release2
.
id
,
release
.
id
])
assert
actual_releases2
==
[
release2
,
release
]
def
test_release_get_random
(
self
,
swh_storage
,
sample_data
):
release
,
release2
,
release3
=
sample_data
.
releases
[:
3
]
swh_storage
.
release_add
([
release
,
release2
,
release3
])
assert
swh_storage
.
release_get_random
()
in
{
release
.
id
,
release2
.
id
,
release3
.
id
,
}
def
test_origin_add
(
self
,
swh_storage
,
sample_data
):
origins
=
list
(
sample_data
.
origins
[:
2
])
origin_urls
=
[
o
.
url
for
o
in
origins
]
assert
swh_storage
.
origin_get
(
origin_urls
)
==
[
None
,
None
]
stats
=
swh_storage
.
origin_add
(
origins
)
assert
stats
==
{
"origin:add"
:
2
}
actual_origins
=
swh_storage
.
origin_get
(
origin_urls
)
assert
actual_origins
==
origins
assert
set
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
set
(
[(
"origin"
,
origins
[
0
]),
(
"origin"
,
origins
[
1
]),]
)
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"origin"
]
==
2
def
test_origin_add_twice
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
add1
=
swh_storage
.
origin_add
([
origin
,
origin2
])
assert
set
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
set
(
[(
"origin"
,
origin
),
(
"origin"
,
origin2
),]
)
assert
add1
==
{
"origin:add"
:
2
}
add2
=
swh_storage
.
origin_add
([
origin
,
origin2
])
assert
set
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
set
(
[(
"origin"
,
origin
),
(
"origin"
,
origin2
),]
)
assert
add2
==
{
"origin:add"
:
0
}
def
test_origin_add_twice_at_once
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
add1
=
swh_storage
.
origin_add
([
origin
,
origin2
,
origin
,
origin2
])
assert
set
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
set
(
[(
"origin"
,
origin
),
(
"origin"
,
origin2
),]
)
assert
add1
==
{
"origin:add"
:
2
}
add2
=
swh_storage
.
origin_add
([
origin
,
origin2
,
origin
,
origin2
])
assert
set
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
set
(
[(
"origin"
,
origin
),
(
"origin"
,
origin2
),]
)
assert
add2
==
{
"origin:add"
:
0
}
def
test_origin_get
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
assert
swh_storage
.
origin_get
([
origin
.
url
])
==
[
None
]
swh_storage
.
origin_add
([
origin
])
actual_origins
=
swh_storage
.
origin_get
([
origin
.
url
])
assert
actual_origins
==
[
origin
]
actual_origins
=
swh_storage
.
origin_get
([
origin
.
url
,
"not://exists"
])
assert
actual_origins
==
[
origin
,
None
]
def
_generate_random_visits
(
self
,
nb_visits
=
100
,
start
=
0
,
end
=
7
):
"""Generate random visits within the last 2 months (to avoid
computations)
"""
visits
=
[]
today
=
now
()
for
weeks
in
range
(
nb_visits
,
0
,
-
1
):
hours
=
random
.
randint
(
0
,
24
)
minutes
=
random
.
randint
(
0
,
60
)
seconds
=
random
.
randint
(
0
,
60
)
days
=
random
.
randint
(
0
,
28
)
weeks
=
random
.
randint
(
start
,
end
)
date_visit
=
today
-
timedelta
(
weeks
=
weeks
,
hours
=
hours
,
minutes
=
minutes
,
seconds
=
seconds
,
days
=
days
)
visits
.
append
(
date_visit
)
return
visits
def
test_origin_visit_get__unknown_origin
(
self
,
swh_storage
):
actual_page
=
swh_storage
.
origin_visit_get
(
"foo"
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[]
assert
actual_page
==
PagedResult
()
def
test_origin_visit_get__validation_failure
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"page_token must be a string"
):
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
10
)
# not bytes
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"order must be a ListOrder value"
):
swh_storage
.
origin_visit_get
(
origin
.
url
,
order
=
"foobar"
)
# wrong order
def
test_origin_visit_get_all
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
ov1
,
ov2
,
ov3
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
),
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit2
,
),
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit2
,
),
]
)
# order asc, no token, no limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov1
,
ov2
,
ov3
]
# order asc, no token, limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
limit
=
2
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ov1
,
ov2
]
# order asc, token, no limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov3
]
# order asc, no token, limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
limit
=
1
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ov1
]
# order asc, token, no limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov2
,
ov3
]
# order asc, token, limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
limit
=
2
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov2
,
ov3
]
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
limit
=
1
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ov2
]
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
limit
=
1
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov3
]
# order desc, no token, no limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov3
,
ov2
,
ov1
]
# order desc, no token, limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
limit
=
2
,
order
=
ListOrder
.
DESC
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ov3
,
ov2
]
# order desc, token, no limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov1
]
# order desc, no token, limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
limit
=
1
,
order
=
ListOrder
.
DESC
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ov3
]
# order desc, token, no limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov2
,
ov1
]
# order desc, token, limit
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
,
limit
=
1
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ov2
]
actual_page
=
swh_storage
.
origin_visit_get
(
origin
.
url
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ov1
]
def
test_origin_visit_status_get__unknown_cases
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
actual_page
=
swh_storage
.
origin_visit_status_get
(
"foobar"
,
1
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[]
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
1
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[]
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
ov1
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
),
]
)[
0
]
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
+
10
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[]
def
test_origin_visit_status_get_all
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
date_visit3
=
round_to_milliseconds
(
now
())
date_visit1
=
date_visit3
-
datetime
.
timedelta
(
hours
=
2
)
date_visit2
=
date_visit3
-
datetime
.
timedelta
(
hours
=
1
)
assert
date_visit1
<
date_visit2
<
date_visit3
ov1
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
date_visit1
,
type
=
sample_data
.
type_visit1
,
),
]
)[
0
]
ovs1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_visit1
,
type
=
ov1
.
type
,
status
=
"created"
,
snapshot
=
None
,
)
ovs2
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_visit2
,
type
=
ov1
.
type
,
status
=
"partial"
,
snapshot
=
None
,
)
ovs3
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_visit3
,
type
=
ov1
.
type
,
status
=
"full"
,
snapshot
=
sample_data
.
snapshot
.
id
,
metadata
=
{},
)
swh_storage
.
origin_visit_status_add
([
ovs2
,
ovs3
])
# order asc, no token, no limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs1
,
ovs2
,
ovs3
]
# order asc, no token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
limit
=
2
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ovs1
,
ovs2
]
# order asc, token, no limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs3
]
# order asc, no token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
limit
=
1
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ovs1
]
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs2
,
ovs3
]
# order asc, token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
,
limit
=
2
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs2
,
ovs3
]
# order asc, no token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
limit
=
2
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ovs1
,
ovs2
]
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
,
limit
=
1
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs3
]
# order desc, no token, no limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs3
,
ovs2
,
ovs1
]
# order desc, no token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
limit
=
2
,
order
=
ListOrder
.
DESC
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ovs3
,
ovs2
]
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs1
]
# order desc, no token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
order
=
ListOrder
.
DESC
,
limit
=
1
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ovs3
]
# order desc, token, no limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs2
,
ovs1
]
# order desc, token, limit
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
,
limit
=
1
,
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
ovs2
]
actual_page
=
swh_storage
.
origin_visit_status_get
(
origin
.
url
,
ov1
.
visit
,
page_token
=
next_page_token
,
order
=
ListOrder
.
DESC
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
ovs1
]
def
test_origin_visit_status_get_random
(
self
,
swh_storage
,
sample_data
):
origins
=
sample_data
.
origins
[:
2
]
swh_storage
.
origin_add
(
origins
)
# Add some random visits within the selection range
visits
=
self
.
_generate_random_visits
()
visit_type
=
"git"
# Add visits to those origins
for
origin
in
origins
:
for
date_visit
in
visits
:
visit
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
date_visit
,
type
=
visit_type
,)]
)[
0
]
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
visit
.
visit
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
)
]
)
swh_storage
.
refresh_stat_counters
()
stats
=
swh_storage
.
stat_counters
()
assert
stats
[
"origin"
]
==
len
(
origins
)
assert
stats
[
"origin_visit"
]
==
len
(
origins
)
*
len
(
visits
)
random_ovs
=
swh_storage
.
origin_visit_status_get_random
(
visit_type
)
assert
random_ovs
assert
random_ovs
.
origin
is
not
None
assert
random_ovs
.
origin
in
[
o
.
url
for
o
in
origins
]
assert
random_ovs
.
type
is
not
None
def
test_origin_visit_status_get_random_nothing_found
(
self
,
swh_storage
,
sample_data
):
origins
=
sample_data
.
origins
swh_storage
.
origin_add
(
origins
)
visit_type
=
"hg"
# Add some visits outside of the random generation selection so nothing
# will be found by the random selection
visits
=
self
.
_generate_random_visits
(
nb_visits
=
3
,
start
=
13
,
end
=
24
)
for
origin
in
origins
:
for
date_visit
in
visits
:
visit
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
date_visit
,
type
=
visit_type
,)]
)[
0
]
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
visit
.
visit
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
)
]
)
random_origin_visit
=
swh_storage
.
origin_visit_status_get_random
(
visit_type
)
assert
random_origin_visit
is
None
def
test_origin_get_by_sha1
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
assert
swh_storage
.
origin_get
([
origin
.
url
])[
0
]
is
None
swh_storage
.
origin_add
([
origin
])
origins
=
list
(
swh_storage
.
origin_get_by_sha1
([
sha1
(
origin
.
url
)]))
assert
len
(
origins
)
==
1
assert
origins
[
0
][
"url"
]
==
origin
.
url
def
test_origin_get_by_sha1_not_found
(
self
,
swh_storage
,
sample_data
):
unknown_origin
=
sample_data
.
origin
assert
swh_storage
.
origin_get
([
unknown_origin
.
url
])[
0
]
is
None
origins
=
list
(
swh_storage
.
origin_get_by_sha1
([
sha1
(
unknown_origin
.
url
)]))
assert
len
(
origins
)
==
1
assert
origins
[
0
]
is
None
def
test_origin_search_single_result
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
actual_page
=
swh_storage
.
origin_search
(
origin
.
url
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[]
actual_page
=
swh_storage
.
origin_search
(
origin
.
url
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[]
swh_storage
.
origin_add
([
origin
])
actual_page
=
swh_storage
.
origin_search
(
origin
.
url
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin
]
actual_page
=
swh_storage
.
origin_search
(
f
".{origin.url[1:-1]}."
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin
]
swh_storage
.
origin_add
([
origin2
])
actual_page
=
swh_storage
.
origin_search
(
origin2
.
url
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin2
]
actual_page
=
swh_storage
.
origin_search
(
f
".{origin2.url[1:-1]}."
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin2
]
def
test_origin_search_no_regexp
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
swh_storage
.
origin_add
([
origin
,
origin2
])
# no pagination
actual_page
=
swh_storage
.
origin_search
(
"/"
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin
,
origin2
]
# offset=0
actual_page
=
swh_storage
.
origin_search
(
"/"
,
page_token
=
None
,
limit
=
1
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
origin
]
# offset=1
actual_page
=
swh_storage
.
origin_search
(
"/"
,
page_token
=
next_page_token
,
limit
=
1
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin2
]
def
test_origin_search_regexp_substring
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
swh_storage
.
origin_add
([
origin
,
origin2
])
# no pagination
actual_page
=
swh_storage
.
origin_search
(
"/"
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin
,
origin2
]
# offset=0
actual_page
=
swh_storage
.
origin_search
(
"/"
,
page_token
=
None
,
limit
=
1
,
regexp
=
True
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
origin
]
# offset=1
actual_page
=
swh_storage
.
origin_search
(
"/"
,
page_token
=
next_page_token
,
limit
=
1
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin2
]
def
test_origin_search_regexp_fullstring
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
swh_storage
.
origin_add
([
origin
,
origin2
])
# no pagination
actual_page
=
swh_storage
.
origin_search
(
".*/.*"
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin
,
origin2
]
# offset=0
actual_page
=
swh_storage
.
origin_search
(
".*/.*"
,
page_token
=
None
,
limit
=
1
,
regexp
=
True
)
next_page_token
=
actual_page
.
next_page_token
assert
next_page_token
is
not
None
assert
actual_page
.
results
==
[
origin
]
# offset=1
actual_page
=
swh_storage
.
origin_search
(
".*/.*"
,
page_token
=
next_page_token
,
limit
=
1
,
regexp
=
True
)
assert
actual_page
.
next_page_token
is
None
assert
actual_page
.
results
==
[
origin2
]
def
test_origin_visit_add
(
self
,
swh_storage
,
sample_data
):
origin1
=
sample_data
.
origins
[
1
]
swh_storage
.
origin_add
([
origin1
])
date_visit
=
now
()
date_visit2
=
date_visit
+
datetime
.
timedelta
(
minutes
=
1
)
date_visit
=
round_to_milliseconds
(
date_visit
)
date_visit2
=
round_to_milliseconds
(
date_visit2
)
visit1
=
OriginVisit
(
origin
=
origin1
.
url
,
date
=
date_visit
,
type
=
sample_data
.
type_visit1
,
)
visit2
=
OriginVisit
(
origin
=
origin1
.
url
,
date
=
date_visit2
,
type
=
sample_data
.
type_visit2
,
)
# add once
ov1
,
ov2
=
swh_storage
.
origin_visit_add
([
visit1
,
visit2
])
# then again (will be ignored as they already exist)
origin_visit1
,
origin_visit2
=
swh_storage
.
origin_visit_add
([
ov1
,
ov2
])
assert
ov1
==
origin_visit1
assert
ov2
==
origin_visit2
ovs1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_visit
,
type
=
ov1
.
type
,
status
=
"created"
,
snapshot
=
None
,
)
ovs2
=
OriginVisitStatus
(
origin
=
ov2
.
origin
,
visit
=
ov2
.
visit
,
date
=
date_visit2
,
type
=
ov2
.
type
,
status
=
"created"
,
snapshot
=
None
,
)
actual_visits
=
swh_storage
.
origin_visit_get
(
origin1
.
url
)
.
results
expected_visits
=
[
ov1
,
ov2
]
assert
len
(
expected_visits
)
==
len
(
actual_visits
)
for
visit
in
expected_visits
:
assert
visit
in
actual_visits
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_objects
=
list
(
[(
"origin"
,
origin1
)]
+
[(
"origin_visit"
,
visit
)
for
visit
in
expected_visits
]
*
2
+
[(
"origin_visit_status"
,
ovs
)
for
ovs
in
[
ovs1
,
ovs2
]]
)
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_origin_visit_add_validation
(
self
,
swh_storage
,
sample_data
):
"""Unknown origin when adding visits should raise"""
visit
=
attr
.
evolve
(
sample_data
.
origin_visit
,
origin
=
"something-unknonw"
)
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"Unknown origin"
):
swh_storage
.
origin_visit_add
([
visit
])
objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
assert
not
objects
def
test_origin_visit_status_add_validation
(
self
,
swh_storage
):
"""Wrong origin_visit_status input should raise storage argument error"""
date_visit
=
now
()
visit_status1
=
OriginVisitStatus
(
origin
=
"unknown-origin-url"
,
visit
=
10
,
date
=
date_visit
,
status
=
"full"
,
snapshot
=
None
,
)
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"Unknown origin"
):
swh_storage
.
origin_visit_status_add
([
visit_status1
])
objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
assert
not
objects
def
test_origin_visit_status_add
(
self
,
swh_storage
,
sample_data
):
"""Correct origin visit statuses should add a new visit status
"""
snapshot
=
sample_data
.
snapshot
origin1
=
sample_data
.
origins
[
1
]
origin2
=
Origin
(
url
=
"new-origin"
)
swh_storage
.
origin_add
([
origin1
,
origin2
])
ov1
,
ov2
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin1
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
),
OriginVisit
(
origin
=
origin2
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit2
,
),
]
)
ovs1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
sample_data
.
date_visit1
,
type
=
ov1
.
type
,
status
=
"created"
,
snapshot
=
None
,
)
ovs2
=
OriginVisitStatus
(
origin
=
ov2
.
origin
,
visit
=
ov2
.
visit
,
date
=
sample_data
.
date_visit2
,
type
=
ov2
.
type
,
status
=
"created"
,
snapshot
=
None
,
)
date_visit_now
=
round_to_milliseconds
(
now
())
visit_status1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_visit_now
,
type
=
ov1
.
type
,
status
=
"full"
,
snapshot
=
snapshot
.
id
,
)
date_visit_now
=
round_to_milliseconds
(
now
())
visit_status2
=
OriginVisitStatus
(
origin
=
ov2
.
origin
,
visit
=
ov2
.
visit
,
date
=
date_visit_now
,
type
=
ov2
.
type
,
status
=
"ongoing"
,
snapshot
=
None
,
metadata
=
{
"intrinsic"
:
"something"
},
)
swh_storage
.
origin_visit_status_add
([
visit_status1
,
visit_status2
])
visit
=
swh_storage
.
origin_visit_get_latest
(
origin1
.
url
,
require_snapshot
=
True
)
visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
visit
.
visit
,
require_snapshot
=
True
)
assert
visit_status
==
visit_status1
visit
=
swh_storage
.
origin_visit_get_latest
(
origin2
.
url
,
require_snapshot
=
False
)
visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin2
.
url
,
visit
.
visit
,
require_snapshot
=
False
)
assert
origin2
.
url
!=
origin1
.
url
assert
visit_status
==
visit_status2
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_origins
=
[
origin1
,
origin2
]
expected_visits
=
[
ov1
,
ov2
]
expected_visit_statuses
=
[
ovs1
,
ovs2
,
visit_status1
,
visit_status2
]
expected_objects
=
(
[(
"origin"
,
o
)
for
o
in
expected_origins
]
+
[(
"origin_visit"
,
v
)
for
v
in
expected_visits
]
+
[(
"origin_visit_status"
,
ovs
)
for
ovs
in
expected_visit_statuses
]
)
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_origin_visit_status_add_twice
(
self
,
swh_storage
,
sample_data
):
"""Correct origin visit statuses should add a new visit status
"""
snapshot
=
sample_data
.
snapshot
origin1
=
sample_data
.
origins
[
1
]
swh_storage
.
origin_add
([
origin1
])
ov1
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin1
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
),
]
)[
0
]
ovs1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
sample_data
.
date_visit1
,
type
=
ov1
.
type
,
status
=
"created"
,
snapshot
=
None
,
)
date_visit_now
=
round_to_milliseconds
(
now
())
visit_status1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_visit_now
,
type
=
ov1
.
type
,
status
=
"full"
,
snapshot
=
snapshot
.
id
,
)
swh_storage
.
origin_visit_status_add
([
visit_status1
])
# second call will ignore existing entries (will send to storage though)
swh_storage
.
origin_visit_status_add
([
visit_status1
])
visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
ov1
.
origin
,
ov1
.
visit
)
assert
visit_status
==
visit_status1
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_origins
=
[
origin1
]
expected_visits
=
[
ov1
]
expected_visit_statuses
=
[
ovs1
,
visit_status1
,
visit_status1
]
# write twice in the journal
expected_objects
=
(
[(
"origin"
,
o
)
for
o
in
expected_origins
]
+
[(
"origin_visit"
,
v
)
for
v
in
expected_visits
]
+
[(
"origin_visit_status"
,
ovs
)
for
ovs
in
expected_visit_statuses
]
)
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_origin_visit_find_by_date
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit1
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit1
,
)
visit2
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit3
,
type
=
sample_data
.
type_visit2
,
)
visit3
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit3
,
)
ov1
,
ov2
,
ov3
=
swh_storage
.
origin_visit_add
([
visit1
,
visit2
,
visit3
])
ovs1
=
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
ov1
.
visit
,
date
=
sample_data
.
date_visit2
,
status
=
"ongoing"
,
snapshot
=
None
,
)
ovs2
=
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
ov2
.
visit
,
date
=
sample_data
.
date_visit3
,
status
=
"ongoing"
,
snapshot
=
None
,
)
ovs3
=
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
ov3
.
visit
,
date
=
sample_data
.
date_visit2
,
status
=
"ongoing"
,
snapshot
=
None
,
)
swh_storage
.
origin_visit_status_add
([
ovs1
,
ovs2
,
ovs3
])
# Simple case
actual_visit
=
swh_storage
.
origin_visit_find_by_date
(
origin
.
url
,
sample_data
.
date_visit3
)
assert
actual_visit
==
ov2
# There are two visits at the same date, the latest must be returned
actual_visit
=
swh_storage
.
origin_visit_find_by_date
(
origin
.
url
,
sample_data
.
date_visit2
)
assert
actual_visit
==
ov3
def
test_origin_visit_find_by_date__unknown_origin
(
self
,
swh_storage
,
sample_data
):
actual_visit
=
swh_storage
.
origin_visit_find_by_date
(
"foo"
,
sample_data
.
date_visit2
)
assert
actual_visit
is
None
def
test_origin_visit_get_by
(
self
,
swh_storage
,
sample_data
):
snapshot
=
sample_data
.
snapshot
origins
=
sample_data
.
origins
[:
2
]
swh_storage
.
origin_add
(
origins
)
origin_url
,
origin_url2
=
[
o
.
url
for
o
in
origins
]
visit
=
OriginVisit
(
origin
=
origin_url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit2
,
)
origin_visit1
=
swh_storage
.
origin_visit_add
([
visit
])[
0
]
swh_storage
.
snapshot_add
([
snapshot
])
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin_url
,
visit
=
origin_visit1
.
visit
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
snapshot
.
id
,
)
]
)
# Add some other {origin, visit} entries
visit2
=
OriginVisit
(
origin
=
origin_url
,
date
=
sample_data
.
date_visit3
,
type
=
sample_data
.
type_visit3
,
)
visit3
=
OriginVisit
(
origin
=
origin_url2
,
date
=
sample_data
.
date_visit3
,
type
=
sample_data
.
type_visit3
,
)
swh_storage
.
origin_visit_add
([
visit2
,
visit3
])
# when
visit1_metadata
=
{
"contents"
:
42
,
"directories"
:
22
,
}
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin_url
,
visit
=
origin_visit1
.
visit
,
date
=
now
(),
status
=
"full"
,
snapshot
=
snapshot
.
id
,
metadata
=
visit1_metadata
,
)
]
)
actual_visit
=
swh_storage
.
origin_visit_get_by
(
origin_url
,
origin_visit1
.
visit
)
assert
actual_visit
==
origin_visit1
def
test_origin_visit_get_by__no_result
(
self
,
swh_storage
,
sample_data
):
actual_visit
=
swh_storage
.
origin_visit_get_by
(
"unknown"
,
10
)
# unknown origin
assert
actual_visit
is
None
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
actual_visit
=
swh_storage
.
origin_visit_get_by
(
origin
.
url
,
999
)
# unknown visit
assert
actual_visit
is
None
def
test_origin_visit_get_latest_edge_cases
(
self
,
swh_storage
,
sample_data
):
# unknown origin so no result
assert
swh_storage
.
origin_visit_get_latest
(
"unknown-origin"
)
is
None
# unknown type so no result
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
assert
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
type
=
"unknown"
)
is
None
# unknown allowed statuses should raise
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"Unknown allowed statuses"
):
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"unknown"
]
)
def
test_origin_visit_get_latest_filter_type
(
self
,
swh_storage
,
sample_data
):
"""Filtering origin visit get latest with filter type should be ok
"""
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit1
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
"git"
,
)
visit2
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
"hg"
,
)
date_now
=
round_to_milliseconds
(
now
())
visit3
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
date_now
,
type
=
"hg"
,)
assert
sample_data
.
date_visit1
<
sample_data
.
date_visit2
assert
sample_data
.
date_visit2
<
date_now
ov1
,
ov2
,
ov3
=
swh_storage
.
origin_visit_add
([
visit1
,
visit2
,
visit3
])
# Check type filter is ok
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
type
=
"git"
)
assert
actual_visit
==
ov1
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
type
=
"hg"
)
assert
actual_visit
==
ov3
actual_visit_unknown_type
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
type
=
"npm"
,
# no visit matching that type
)
assert
actual_visit_unknown_type
is
None
def
test_origin_visit_get_latest
(
self
,
swh_storage
,
sample_data
):
empty_snapshot
,
complete_snapshot
=
sample_data
.
snapshots
[
1
:
3
]
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit1
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
"git"
,
)
visit2
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
"hg"
,
)
date_now
=
round_to_milliseconds
(
now
())
visit3
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
date_now
,
type
=
"hg"
,)
assert
visit1
.
date
<
visit2
.
date
assert
visit2
.
date
<
visit3
.
date
ov1
,
ov2
,
ov3
=
swh_storage
.
origin_visit_add
([
visit1
,
visit2
,
visit3
])
# no filters, latest visit is the last one (whose date is most recent)
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
assert
actual_visit
==
ov3
# 3 visits, none has snapshot so nothing is returned
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
require_snapshot
=
True
)
assert
actual_visit
is
None
# visit are created with "created" status, so nothing will get returned
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"partial"
]
)
assert
actual_visit
is
None
# visit are created with "created" status, so most recent again
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"created"
]
)
assert
actual_visit
==
ov3
# Add snapshot to visit1; require_snapshot=True makes it return first visit
swh_storage
.
snapshot_add
([
complete_snapshot
])
visit_status_with_snapshot
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
round_to_milliseconds
(
now
()),
type
=
ov1
.
type
,
status
=
"ongoing"
,
snapshot
=
complete_snapshot
.
id
,
)
swh_storage
.
origin_visit_status_add
([
visit_status_with_snapshot
])
# only the first visit has a snapshot now
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
require_snapshot
=
True
)
assert
actual_visit
==
ov1
# only the first visit has a status ongoing now
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"ongoing"
]
)
assert
actual_visit
==
ov1
actual_visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
ov1
.
visit
,
require_snapshot
=
True
)
assert
actual_visit_status
==
visit_status_with_snapshot
# ... and require_snapshot=False (defaults) still returns latest visit (3rd)
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
require_snapshot
=
False
)
assert
actual_visit
==
ov3
# no specific filter, this returns as before the latest visit
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
assert
actual_visit
==
ov3
# Status filter: all three visits are status=ongoing, so no visit
# returned
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"full"
]
)
assert
actual_visit
is
None
visit_status1_full
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
round_to_milliseconds
(
now
()),
type
=
ov1
.
type
,
status
=
"full"
,
snapshot
=
complete_snapshot
.
id
,
)
# Mark the first visit as completed and check status filter again
swh_storage
.
origin_visit_status_add
([
visit_status1_full
])
# only the first visit has the full status
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"full"
]
)
assert
actual_visit
==
ov1
actual_visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
ov1
.
visit
,
allowed_statuses
=
[
"full"
]
)
assert
actual_visit_status
==
visit_status1_full
# no specific filter, this returns as before the latest visit
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
assert
actual_visit
==
ov3
# Add snapshot to visit2 and check that the new snapshot is returned
swh_storage
.
snapshot_add
([
empty_snapshot
])
visit_status2_full
=
OriginVisitStatus
(
origin
=
ov2
.
origin
,
visit
=
ov2
.
visit
,
date
=
round_to_milliseconds
(
now
()),
type
=
ov2
.
type
,
status
=
"ongoing"
,
snapshot
=
empty_snapshot
.
id
,
)
swh_storage
.
origin_visit_status_add
([
visit_status2_full
])
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
require_snapshot
=
True
)
# 2nd visit is most recent with a snapshot
assert
actual_visit
==
ov2
actual_visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
ov2
.
visit
,
require_snapshot
=
True
)
assert
actual_visit_status
==
visit_status2_full
# no specific filter, this returns as before the latest visit, 3rd one
actual_origin
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
assert
actual_origin
==
ov3
# full status is still the first visit
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"full"
]
)
assert
actual_visit
==
ov1
# Add snapshot to visit3 (same date as visit2)
visit_status3_with_snapshot
=
OriginVisitStatus
(
origin
=
ov3
.
origin
,
visit
=
ov3
.
visit
,
date
=
round_to_milliseconds
(
now
()),
type
=
ov3
.
type
,
status
=
"ongoing"
,
snapshot
=
complete_snapshot
.
id
,
)
swh_storage
.
origin_visit_status_add
([
visit_status3_with_snapshot
])
# full status is still the first visit
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"full"
],
require_snapshot
=
True
,
)
assert
actual_visit
==
ov1
actual_visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
visit
=
actual_visit
.
visit
,
allowed_statuses
=
[
"full"
],
require_snapshot
=
True
,
)
assert
actual_visit_status
==
visit_status1_full
# most recent is still the 3rd visit
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
assert
actual_visit
==
ov3
# 3rd visit has a snapshot now, so it's elected
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
require_snapshot
=
True
)
assert
actual_visit
==
ov3
actual_visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
ov3
.
visit
,
require_snapshot
=
True
)
assert
actual_visit_status
==
visit_status3_with_snapshot
def
test_origin_visit_get_latest__same_date
(
self
,
swh_storage
,
sample_data
):
empty_snapshot
,
complete_snapshot
=
sample_data
.
snapshots
[
1
:
3
]
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit1
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
"git"
,
)
visit2
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
"hg"
,
)
ov1
,
ov2
=
swh_storage
.
origin_visit_add
([
visit1
,
visit2
])
# ties should be broken by using the visit id
actual_visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
assert
actual_visit
==
ov2
def
test_origin_visit_get_latest__not_last
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit1
,
visit2
=
sample_data
.
origin_visits
[:
2
]
assert
visit1
.
origin
==
origin
.
url
swh_storage
.
origin_visit_add
([
visit1
])
ov1
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
)
# Add snapshot to visit1, latest snapshot = visit 1 snapshot
complete_snapshot
=
sample_data
.
snapshots
[
2
]
swh_storage
.
snapshot_add
([
complete_snapshot
])
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
ov1
.
visit
,
date
=
visit2
.
date
,
status
=
"partial"
,
snapshot
=
None
,
)
]
)
assert
visit1
.
date
<
visit2
.
date
# no snapshot associated to the visit, so None
visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
allowed_statuses
=
[
"partial"
],
require_snapshot
=
True
,
)
assert
visit
is
None
date_now
=
now
()
assert
visit2
.
date
<
date_now
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
ov1
.
visit
,
date
=
date_now
,
status
=
"full"
,
snapshot
=
complete_snapshot
.
id
,
)
]
)
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
now
(),
type
=
visit1
.
type
,)]
)
visit
=
swh_storage
.
origin_visit_get_latest
(
origin
.
url
,
require_snapshot
=
True
)
assert
visit
is
not
None
def
test_origin_visit_status_get_latest__validation
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit1
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
"git"
,
)
# unknown allowed statuses should raise
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"Unknown allowed statuses"
):
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
visit1
.
visit
,
allowed_statuses
=
[
"unknown"
]
)
def
test_origin_visit_status_get_latest
(
self
,
swh_storage
,
sample_data
):
snapshot
=
sample_data
.
snapshots
[
2
]
origin1
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin1
])
# to have some reference visits
ov1
,
ov2
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin1
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
),
OriginVisit
(
origin
=
origin1
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit2
,
),
]
)
swh_storage
.
snapshot_add
([
snapshot
])
date_now
=
round_to_milliseconds
(
now
())
assert
sample_data
.
date_visit1
<
sample_data
.
date_visit2
assert
sample_data
.
date_visit2
<
date_now
ovs1
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
sample_data
.
date_visit1
,
type
=
ov1
.
type
,
status
=
"partial"
,
snapshot
=
None
,
)
ovs2
=
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
sample_data
.
date_visit2
,
type
=
ov1
.
type
,
status
=
"ongoing"
,
snapshot
=
None
,
)
ovs3
=
OriginVisitStatus
(
origin
=
ov2
.
origin
,
visit
=
ov2
.
visit
,
date
=
sample_data
.
date_visit2
+
datetime
.
timedelta
(
minutes
=
1
),
# to not be ignored
type
=
ov2
.
type
,
status
=
"ongoing"
,
snapshot
=
None
,
)
ovs4
=
OriginVisitStatus
(
origin
=
ov2
.
origin
,
visit
=
ov2
.
visit
,
date
=
date_now
,
type
=
ov2
.
type
,
status
=
"full"
,
snapshot
=
snapshot
.
id
,
metadata
=
{
"something"
:
"wicked"
},
)
swh_storage
.
origin_visit_status_add
([
ovs1
,
ovs2
,
ovs3
,
ovs4
])
# unknown origin so no result
actual_origin_visit
=
swh_storage
.
origin_visit_status_get_latest
(
"unknown-origin"
,
ov1
.
visit
)
assert
actual_origin_visit
is
None
# unknown visit so no result
actual_origin_visit
=
swh_storage
.
origin_visit_status_get_latest
(
ov1
.
origin
,
ov1
.
visit
+
10
)
assert
actual_origin_visit
is
None
# Two visits, both with no snapshot, take the most recent
actual_origin_visit2
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
ov1
.
visit
)
assert
isinstance
(
actual_origin_visit2
,
OriginVisitStatus
)
assert
actual_origin_visit2
==
ovs2
assert
ovs2
.
origin
==
origin1
.
url
assert
ovs2
.
visit
==
ov1
.
visit
actual_origin_visit
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
ov1
.
visit
,
require_snapshot
=
True
)
# there is no visit with snapshot yet for that visit
assert
actual_origin_visit
is
None
actual_origin_visit2
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
ov1
.
visit
,
allowed_statuses
=
[
"partial"
,
"ongoing"
]
)
# visit status with partial status visit elected
assert
actual_origin_visit2
==
ovs2
assert
actual_origin_visit2
.
status
==
"ongoing"
actual_origin_visit4
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
ov2
.
visit
,
require_snapshot
=
True
)
assert
actual_origin_visit4
==
ovs4
assert
actual_origin_visit4
.
snapshot
==
snapshot
.
id
actual_origin_visit
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
ov2
.
visit
,
require_snapshot
=
True
,
allowed_statuses
=
[
"ongoing"
]
)
# nothing matches so nothing
assert
actual_origin_visit
is
None
# there is no visit with status full
actual_origin_visit3
=
swh_storage
.
origin_visit_status_get_latest
(
origin1
.
url
,
ov2
.
visit
,
allowed_statuses
=
[
"ongoing"
]
)
assert
actual_origin_visit3
==
ovs3
def
test_person_fullname_unicity
(
self
,
swh_storage
,
sample_data
):
revision
,
rev2
=
sample_data
.
revisions
[
0
:
2
]
# create a revision with same committer fullname but wo name and email
revision2
=
attr
.
evolve
(
rev2
,
committer
=
Person
(
fullname
=
revision
.
committer
.
fullname
,
name
=
None
,
email
=
None
),
)
swh_storage
.
revision_add
([
revision
,
revision2
])
# when getting added revisions
revisions
=
swh_storage
.
revision_get
([
revision
.
id
,
revision2
.
id
])
# then check committers are the same
assert
revisions
[
0
]
.
committer
==
revisions
[
1
]
.
committer
def
test_snapshot_add_get_empty
(
self
,
swh_storage
,
sample_data
):
empty_snapshot
=
sample_data
.
snapshots
[
1
]
empty_snapshot_dict
=
empty_snapshot
.
to_dict
()
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
ov1
=
swh_storage
.
origin_visit_add
(
[
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
)
]
)[
0
]
actual_result
=
swh_storage
.
snapshot_add
([
empty_snapshot
])
assert
actual_result
==
{
"snapshot:add"
:
1
}
date_now
=
now
()
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
ov1
.
origin
,
visit
=
ov1
.
visit
,
date
=
date_now
,
type
=
ov1
.
type
,
status
=
"full"
,
snapshot
=
empty_snapshot
.
id
,
)
]
)
by_id
=
swh_storage
.
snapshot_get
(
empty_snapshot
.
id
)
assert
by_id
==
{
**
empty_snapshot_dict
,
"next_branch"
:
None
}
ovs1
=
OriginVisitStatus
.
from_dict
(
{
"origin"
:
ov1
.
origin
,
"date"
:
sample_data
.
date_visit1
,
"type"
:
ov1
.
type
,
"visit"
:
ov1
.
visit
,
"status"
:
"created"
,
"snapshot"
:
None
,
"metadata"
:
None
,
}
)
ovs2
=
OriginVisitStatus
.
from_dict
(
{
"origin"
:
ov1
.
origin
,
"date"
:
date_now
,
"type"
:
ov1
.
type
,
"visit"
:
ov1
.
visit
,
"status"
:
"full"
,
"metadata"
:
None
,
"snapshot"
:
empty_snapshot
.
id
,
}
)
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_objects
=
[
(
"origin"
,
origin
),
(
"origin_visit"
,
ov1
),
(
"origin_visit_status"
,
ovs1
,),
(
"snapshot"
,
empty_snapshot
),
(
"origin_visit_status"
,
ovs2
,),
]
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_snapshot_add_get_complete
(
self
,
swh_storage
,
sample_data
):
complete_snapshot
=
sample_data
.
snapshots
[
2
]
complete_snapshot_dict
=
complete_snapshot
.
to_dict
()
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
)
origin_visit1
=
swh_storage
.
origin_visit_add
([
visit
])[
0
]
actual_result
=
swh_storage
.
snapshot_add
([
complete_snapshot
])
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
origin_visit1
.
visit
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
complete_snapshot
.
id
,
)
]
)
assert
actual_result
==
{
"snapshot:add"
:
1
}
by_id
=
swh_storage
.
snapshot_get
(
complete_snapshot
.
id
)
assert
by_id
==
{
**
complete_snapshot_dict
,
"next_branch"
:
None
}
def
test_snapshot_add_many
(
self
,
swh_storage
,
sample_data
):
snapshot
,
_
,
complete_snapshot
=
sample_data
.
snapshots
[:
3
]
actual_result
=
swh_storage
.
snapshot_add
([
snapshot
,
complete_snapshot
])
assert
actual_result
==
{
"snapshot:add"
:
2
}
assert
swh_storage
.
snapshot_get
(
complete_snapshot
.
id
)
==
{
**
complete_snapshot
.
to_dict
(),
"next_branch"
:
None
,
}
assert
swh_storage
.
snapshot_get
(
snapshot
.
id
)
==
{
**
snapshot
.
to_dict
(),
"next_branch"
:
None
,
}
swh_storage
.
refresh_stat_counters
()
assert
swh_storage
.
stat_counters
()[
"snapshot"
]
==
2
def
test_snapshot_add_many_incremental
(
self
,
swh_storage
,
sample_data
):
snapshot
,
_
,
complete_snapshot
=
sample_data
.
snapshots
[:
3
]
actual_result
=
swh_storage
.
snapshot_add
([
complete_snapshot
])
assert
actual_result
==
{
"snapshot:add"
:
1
}
actual_result2
=
swh_storage
.
snapshot_add
([
snapshot
,
complete_snapshot
])
assert
actual_result2
==
{
"snapshot:add"
:
1
}
assert
swh_storage
.
snapshot_get
(
complete_snapshot
.
id
)
==
{
**
complete_snapshot
.
to_dict
(),
"next_branch"
:
None
,
}
assert
swh_storage
.
snapshot_get
(
snapshot
.
id
)
==
{
**
snapshot
.
to_dict
(),
"next_branch"
:
None
,
}
def
test_snapshot_add_twice
(
self
,
swh_storage
,
sample_data
):
snapshot
,
empty_snapshot
=
sample_data
.
snapshots
[:
2
]
actual_result
=
swh_storage
.
snapshot_add
([
empty_snapshot
])
assert
actual_result
==
{
"snapshot:add"
:
1
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"snapshot"
,
empty_snapshot
)
]
actual_result
=
swh_storage
.
snapshot_add
([
snapshot
])
assert
actual_result
==
{
"snapshot:add"
:
1
}
assert
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
==
[
(
"snapshot"
,
empty_snapshot
),
(
"snapshot"
,
snapshot
),
]
def
test_snapshot_add_count_branches
(
self
,
swh_storage
,
sample_data
):
complete_snapshot
=
sample_data
.
snapshots
[
2
]
actual_result
=
swh_storage
.
snapshot_add
([
complete_snapshot
])
assert
actual_result
==
{
"snapshot:add"
:
1
}
snp_size
=
swh_storage
.
snapshot_count_branches
(
complete_snapshot
.
id
)
expected_snp_size
=
{
"alias"
:
1
,
"content"
:
1
,
"directory"
:
2
,
"release"
:
1
,
"revision"
:
1
,
"snapshot"
:
1
,
None
:
1
,
}
assert
snp_size
==
expected_snp_size
def
test_snapshot_add_get_paginated
(
self
,
swh_storage
,
sample_data
):
complete_snapshot
=
sample_data
.
snapshots
[
2
]
swh_storage
.
snapshot_add
([
complete_snapshot
])
snp_id
=
complete_snapshot
.
id
branches
=
complete_snapshot
.
branches
branch_names
=
list
(
sorted
(
branches
))
# Test branch_from
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
branches_from
=
b
"release"
)
rel_idx
=
branch_names
.
index
(
b
"release"
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
name
:
branches
[
name
]
for
name
in
branch_names
[
rel_idx
:]},
"next_branch"
:
None
,
}
assert
snapshot
==
expected_snapshot
# Test branches_count
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
branches_count
=
1
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
branch_names
[
0
]:
branches
[
branch_names
[
0
]],},
"next_branch"
:
b
"content"
,
}
assert
snapshot
==
expected_snapshot
# test branch_from + branches_count
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
branches_from
=
b
"directory"
,
branches_count
=
3
)
dir_idx
=
branch_names
.
index
(
b
"directory"
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
name
:
branches
[
name
]
for
name
in
branch_names
[
dir_idx
:
dir_idx
+
3
]
},
"next_branch"
:
branch_names
[
dir_idx
+
3
],
}
assert
snapshot
==
expected_snapshot
def
test_snapshot_add_get_filtered
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
complete_snapshot
=
sample_data
.
snapshots
[
2
]
swh_storage
.
origin_add
([
origin
])
visit
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
)
origin_visit1
=
swh_storage
.
origin_visit_add
([
visit
])[
0
]
swh_storage
.
snapshot_add
([
complete_snapshot
])
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
origin_visit1
.
visit
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
complete_snapshot
.
id
,
)
]
)
snp_id
=
complete_snapshot
.
id
branches
=
complete_snapshot
.
branches
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
target_types
=
[
"release"
,
"revision"
]
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
name
:
tgt
for
name
,
tgt
in
branches
.
items
()
if
tgt
and
tgt
.
target_type
in
[
TargetType
.
RELEASE
,
TargetType
.
REVISION
]
},
"next_branch"
:
None
,
}
assert
snapshot
==
expected_snapshot
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
target_types
=
[
"alias"
])
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
name
:
tgt
for
name
,
tgt
in
branches
.
items
()
if
tgt
and
tgt
.
target_type
==
TargetType
.
ALIAS
},
"next_branch"
:
None
,
}
assert
snapshot
==
expected_snapshot
def
test_snapshot_add_get_filtered_and_paginated
(
self
,
swh_storage
,
sample_data
):
complete_snapshot
=
sample_data
.
snapshots
[
2
]
swh_storage
.
snapshot_add
([
complete_snapshot
])
snp_id
=
complete_snapshot
.
id
branches
=
complete_snapshot
.
branches
branch_names
=
list
(
sorted
(
branches
))
# Test branch_from
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
target_types
=
[
"directory"
,
"release"
],
branches_from
=
b
"directory2"
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
name
:
branches
[
name
]
for
name
in
(
b
"directory2"
,
b
"release"
)},
"next_branch"
:
None
,
}
assert
snapshot
==
expected_snapshot
# Test branches_count
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
target_types
=
[
"directory"
,
"release"
],
branches_count
=
1
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
b
"directory"
:
branches
[
b
"directory"
]},
"next_branch"
:
b
"directory2"
,
}
assert
snapshot
==
expected_snapshot
# Test branches_count
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
target_types
=
[
"directory"
,
"release"
],
branches_count
=
2
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
name
:
branches
[
name
]
for
name
in
(
b
"directory"
,
b
"directory2"
)
},
"next_branch"
:
b
"release"
,
}
assert
snapshot
==
expected_snapshot
# test branch_from + branches_count
snapshot
=
swh_storage
.
snapshot_get_branches
(
snp_id
,
target_types
=
[
"directory"
,
"release"
],
branches_from
=
b
"directory2"
,
branches_count
=
1
,
)
dir_idx
=
branch_names
.
index
(
b
"directory2"
)
expected_snapshot
=
{
"id"
:
snp_id
,
"branches"
:
{
branch_names
[
dir_idx
]:
branches
[
branch_names
[
dir_idx
]],},
"next_branch"
:
b
"release"
,
}
assert
snapshot
==
expected_snapshot
def
test_snapshot_add_get_branch_by_type
(
self
,
swh_storage
,
sample_data
):
complete_snapshot
=
sample_data
.
snapshots
[
2
]
snapshot
=
complete_snapshot
.
to_dict
()
alias1
=
b
"alias1"
alias2
=
b
"alias2"
target1
=
random
.
choice
(
list
(
snapshot
[
"branches"
]
.
keys
()))
target2
=
random
.
choice
(
list
(
snapshot
[
"branches"
]
.
keys
()))
snapshot
[
"branches"
][
alias2
]
=
{
"target"
:
target2
,
"target_type"
:
"alias"
,
}
snapshot
[
"branches"
][
alias1
]
=
{
"target"
:
target1
,
"target_type"
:
"alias"
,
}
new_snapshot
=
Snapshot
.
from_dict
(
snapshot
)
swh_storage
.
snapshot_add
([
new_snapshot
])
branches
=
swh_storage
.
snapshot_get_branches
(
new_snapshot
.
id
,
target_types
=
[
"alias"
],
branches_from
=
alias1
,
branches_count
=
1
,
)[
"branches"
]
assert
len
(
branches
)
==
1
assert
alias1
in
branches
def
test_snapshot_add_get
(
self
,
swh_storage
,
sample_data
):
snapshot
=
sample_data
.
snapshot
origin
=
sample_data
.
origin
swh_storage
.
origin_add
([
origin
])
visit
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit1
,
type
=
sample_data
.
type_visit1
,
)
ov1
=
swh_storage
.
origin_visit_add
([
visit
])[
0
]
swh_storage
.
snapshot_add
([
snapshot
])
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
ov1
.
visit
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
snapshot
.
id
,
)
]
)
expected_snapshot
=
{
**
snapshot
.
to_dict
(),
"next_branch"
:
None
}
by_id
=
swh_storage
.
snapshot_get
(
snapshot
.
id
)
assert
by_id
==
expected_snapshot
actual_visit
=
swh_storage
.
origin_visit_get_by
(
origin
.
url
,
ov1
.
visit
)
assert
actual_visit
==
ov1
visit_status
=
swh_storage
.
origin_visit_status_get_latest
(
origin
.
url
,
ov1
.
visit
,
require_snapshot
=
True
)
assert
visit_status
.
snapshot
==
snapshot
.
id
def
test_snapshot_get_random
(
self
,
swh_storage
,
sample_data
):
snapshot
,
empty_snapshot
,
complete_snapshot
=
sample_data
.
snapshots
[:
3
]
swh_storage
.
snapshot_add
([
snapshot
,
empty_snapshot
,
complete_snapshot
])
assert
swh_storage
.
snapshot_get_random
()
in
{
snapshot
.
id
,
empty_snapshot
.
id
,
complete_snapshot
.
id
,
}
def
test_snapshot_missing
(
self
,
swh_storage
,
sample_data
):
snapshot
,
missing_snapshot
=
sample_data
.
snapshots
[:
2
]
snapshots
=
[
snapshot
.
id
,
missing_snapshot
.
id
]
swh_storage
.
snapshot_add
([
snapshot
])
missing_snapshots
=
swh_storage
.
snapshot_missing
(
snapshots
)
assert
list
(
missing_snapshots
)
==
[
missing_snapshot
.
id
]
def
test_stat_counters
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
snapshot
=
sample_data
.
snapshot
revision
=
sample_data
.
revision
release
=
sample_data
.
release
directory
=
sample_data
.
directory
content
=
sample_data
.
content
expected_keys
=
[
"content"
,
"directory"
,
"origin"
,
"revision"
]
# Initially, all counters are 0
swh_storage
.
refresh_stat_counters
()
counters
=
swh_storage
.
stat_counters
()
assert
set
(
expected_keys
)
<=
set
(
counters
)
for
key
in
expected_keys
:
assert
counters
[
key
]
==
0
# Add a content. Only the content counter should increase.
swh_storage
.
content_add
([
content
])
swh_storage
.
refresh_stat_counters
()
counters
=
swh_storage
.
stat_counters
()
assert
set
(
expected_keys
)
<=
set
(
counters
)
for
key
in
expected_keys
:
if
key
!=
"content"
:
assert
counters
[
key
]
==
0
assert
counters
[
"content"
]
==
1
# Add other objects. Check their counter increased as well.
swh_storage
.
origin_add
([
origin
])
visit
=
OriginVisit
(
origin
=
origin
.
url
,
date
=
sample_data
.
date_visit2
,
type
=
sample_data
.
type_visit2
,
)
origin_visit1
=
swh_storage
.
origin_visit_add
([
visit
])[
0
]
swh_storage
.
snapshot_add
([
snapshot
])
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin
.
url
,
visit
=
origin_visit1
.
visit
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
snapshot
.
id
,
)
]
)
swh_storage
.
directory_add
([
directory
])
swh_storage
.
revision_add
([
revision
])
swh_storage
.
release_add
([
release
])
swh_storage
.
refresh_stat_counters
()
counters
=
swh_storage
.
stat_counters
()
assert
counters
[
"content"
]
==
1
assert
counters
[
"directory"
]
==
1
assert
counters
[
"snapshot"
]
==
1
assert
counters
[
"origin"
]
==
1
assert
counters
[
"origin_visit"
]
==
1
assert
counters
[
"revision"
]
==
1
assert
counters
[
"release"
]
==
1
assert
counters
[
"snapshot"
]
==
1
if
"person"
in
counters
:
assert
counters
[
"person"
]
==
3
def
test_content_find_ctime
(
self
,
swh_storage
,
sample_data
):
origin_content
=
sample_data
.
content
ctime
=
round_to_milliseconds
(
now
())
content
=
attr
.
evolve
(
origin_content
,
data
=
None
,
ctime
=
ctime
)
swh_storage
.
content_add_metadata
([
content
])
actually_present
=
swh_storage
.
content_find
({
"sha1"
:
content
.
sha1
})
assert
actually_present
[
0
]
==
content
assert
actually_present
[
0
]
.
ctime
is
not
None
assert
actually_present
[
0
]
.
ctime
.
tzinfo
is
not
None
def
test_content_find_with_present_content
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
expected_content
=
attr
.
evolve
(
content
,
data
=
None
)
# 1. with something to find
swh_storage
.
content_add
([
content
])
actually_present
=
swh_storage
.
content_find
({
"sha1"
:
content
.
sha1
})
assert
1
==
len
(
actually_present
)
assert
actually_present
[
0
]
==
expected_content
# 2. with something to find
actually_present
=
swh_storage
.
content_find
({
"sha1_git"
:
content
.
sha1_git
})
assert
1
==
len
(
actually_present
)
assert
actually_present
[
0
]
==
expected_content
# 3. with something to find
actually_present
=
swh_storage
.
content_find
({
"sha256"
:
content
.
sha256
})
assert
1
==
len
(
actually_present
)
assert
actually_present
[
0
]
==
expected_content
# 4. with something to find
actually_present
=
swh_storage
.
content_find
(
content
.
hashes
())
assert
1
==
len
(
actually_present
)
assert
actually_present
[
0
]
==
expected_content
def
test_content_find_with_non_present_content
(
self
,
swh_storage
,
sample_data
):
missing_content
=
sample_data
.
skipped_content
# 1. with something that does not exist
actually_present
=
swh_storage
.
content_find
({
"sha1"
:
missing_content
.
sha1
})
assert
actually_present
==
[]
# 2. with something that does not exist
actually_present
=
swh_storage
.
content_find
(
{
"sha1_git"
:
missing_content
.
sha1_git
}
)
assert
actually_present
==
[]
# 3. with something that does not exist
actually_present
=
swh_storage
.
content_find
({
"sha256"
:
missing_content
.
sha256
})
assert
actually_present
==
[]
def
test_content_find_with_duplicate_input
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
# Create fake data with colliding sha256 and blake2s256
sha1_array
=
bytearray
(
content
.
sha1
)
sha1_array
[
0
]
+=
1
sha1git_array
=
bytearray
(
content
.
sha1_git
)
sha1git_array
[
0
]
+=
1
duplicated_content
=
attr
.
evolve
(
content
,
sha1
=
bytes
(
sha1_array
),
sha1_git
=
bytes
(
sha1git_array
)
)
# Inject the data
swh_storage
.
content_add
([
content
,
duplicated_content
])
actual_result
=
swh_storage
.
content_find
(
{
"blake2s256"
:
duplicated_content
.
blake2s256
,
"sha256"
:
duplicated_content
.
sha256
,
}
)
expected_content
=
attr
.
evolve
(
content
,
data
=
None
)
expected_duplicated_content
=
attr
.
evolve
(
duplicated_content
,
data
=
None
)
for
result
in
actual_result
:
assert
result
in
[
expected_content
,
expected_duplicated_content
]
def
test_content_find_with_duplicate_sha256
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
hashes
=
{}
# Create fake data with colliding sha256
for
hashalgo
in
(
"sha1"
,
"sha1_git"
,
"blake2s256"
):
value
=
bytearray
(
getattr
(
content
,
hashalgo
))
value
[
0
]
+=
1
hashes
[
hashalgo
]
=
bytes
(
value
)
duplicated_content
=
attr
.
evolve
(
content
,
sha1
=
hashes
[
"sha1"
],
sha1_git
=
hashes
[
"sha1_git"
],
blake2s256
=
hashes
[
"blake2s256"
],
)
swh_storage
.
content_add
([
content
,
duplicated_content
])
actual_result
=
swh_storage
.
content_find
({
"sha256"
:
duplicated_content
.
sha256
})
assert
len
(
actual_result
)
==
2
expected_content
=
attr
.
evolve
(
content
,
data
=
None
)
expected_duplicated_content
=
attr
.
evolve
(
duplicated_content
,
data
=
None
)
for
result
in
actual_result
:
assert
result
in
[
expected_content
,
expected_duplicated_content
]
# Find with both sha256 and blake2s256
actual_result
=
swh_storage
.
content_find
(
{
"sha256"
:
duplicated_content
.
sha256
,
"blake2s256"
:
duplicated_content
.
blake2s256
,
}
)
assert
len
(
actual_result
)
==
1
assert
actual_result
==
[
expected_duplicated_content
]
def
test_content_find_with_duplicate_blake2s256
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
# Create fake data with colliding sha256 and blake2s256
sha1_array
=
bytearray
(
content
.
sha1
)
sha1_array
[
0
]
+=
1
sha1git_array
=
bytearray
(
content
.
sha1_git
)
sha1git_array
[
0
]
+=
1
sha256_array
=
bytearray
(
content
.
sha256
)
sha256_array
[
0
]
+=
1
duplicated_content
=
attr
.
evolve
(
content
,
sha1
=
bytes
(
sha1_array
),
sha1_git
=
bytes
(
sha1git_array
),
sha256
=
bytes
(
sha256_array
),
)
swh_storage
.
content_add
([
content
,
duplicated_content
])
actual_result
=
swh_storage
.
content_find
(
{
"blake2s256"
:
duplicated_content
.
blake2s256
}
)
expected_content
=
attr
.
evolve
(
content
,
data
=
None
)
expected_duplicated_content
=
attr
.
evolve
(
duplicated_content
,
data
=
None
)
for
result
in
actual_result
:
assert
result
in
[
expected_content
,
expected_duplicated_content
]
# Find with both sha256 and blake2s256
actual_result
=
swh_storage
.
content_find
(
{
"sha256"
:
duplicated_content
.
sha256
,
"blake2s256"
:
duplicated_content
.
blake2s256
,
}
)
assert
actual_result
==
[
expected_duplicated_content
]
def
test_content_find_bad_input
(
self
,
swh_storage
):
# 1. with no hash to lookup
with
pytest
.
raises
(
StorageArgumentException
):
swh_storage
.
content_find
({})
# need at least one hash
# 2. with bad hash
with
pytest
.
raises
(
StorageArgumentException
):
swh_storage
.
content_find
({
"unknown-sha1"
:
"something"
})
# not the right key
def
test_object_find_by_sha1_git
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
directory
=
sample_data
.
directory
revision
=
sample_data
.
revision
release
=
sample_data
.
release
sha1_gits
=
[
b
"00000000000000000000"
]
expected
=
{
b
"00000000000000000000"
:
[],
}
swh_storage
.
content_add
([
content
])
sha1_gits
.
append
(
content
.
sha1_git
)
expected
[
content
.
sha1_git
]
=
[
{
"sha1_git"
:
content
.
sha1_git
,
"type"
:
"content"
,}
]
swh_storage
.
directory_add
([
directory
])
sha1_gits
.
append
(
directory
.
id
)
expected
[
directory
.
id
]
=
[{
"sha1_git"
:
directory
.
id
,
"type"
:
"directory"
,}]
swh_storage
.
revision_add
([
revision
])
sha1_gits
.
append
(
revision
.
id
)
expected
[
revision
.
id
]
=
[{
"sha1_git"
:
revision
.
id
,
"type"
:
"revision"
,}]
swh_storage
.
release_add
([
release
])
sha1_gits
.
append
(
release
.
id
)
expected
[
release
.
id
]
=
[{
"sha1_git"
:
release
.
id
,
"type"
:
"release"
,}]
ret
=
swh_storage
.
object_find_by_sha1_git
(
sha1_gits
)
assert
expected
==
ret
def
test_metadata_fetcher_add_get
(
self
,
swh_storage
,
sample_data
):
fetcher
=
sample_data
.
metadata_fetcher
actual_fetcher
=
swh_storage
.
metadata_fetcher_get
(
fetcher
.
name
,
fetcher
.
version
)
assert
actual_fetcher
is
None
# does not exist
swh_storage
.
metadata_fetcher_add
([
fetcher
])
res
=
swh_storage
.
metadata_fetcher_get
(
fetcher
.
name
,
fetcher
.
version
)
assert
res
==
fetcher
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_objects
=
[
(
"metadata_fetcher"
,
fetcher
),
]
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_metadata_fetcher_add_zero
(
self
,
swh_storage
,
sample_data
):
fetcher
=
sample_data
.
metadata_fetcher
actual_fetcher
=
swh_storage
.
metadata_fetcher_get
(
fetcher
.
name
,
fetcher
.
version
)
assert
actual_fetcher
is
None
# does not exist
swh_storage
.
metadata_fetcher_add
([])
def
test_metadata_authority_add_get
(
self
,
swh_storage
,
sample_data
):
authority
=
sample_data
.
metadata_authority
actual_authority
=
swh_storage
.
metadata_authority_get
(
authority
.
type
,
authority
.
url
)
assert
actual_authority
is
None
# does not exist
swh_storage
.
metadata_authority_add
([
authority
])
res
=
swh_storage
.
metadata_authority_get
(
authority
.
type
,
authority
.
url
)
assert
res
==
authority
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_objects
=
[
(
"metadata_authority"
,
authority
),
]
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_metadata_authority_add_zero
(
self
,
swh_storage
,
sample_data
):
authority
=
sample_data
.
metadata_authority
actual_authority
=
swh_storage
.
metadata_authority_get
(
authority
.
type
,
authority
.
url
)
assert
actual_authority
is
None
# does not exist
swh_storage
.
metadata_authority_add
([])
def
test_content_metadata_add
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
content_metadata
=
sample_data
.
content_metadata
[:
2
]
content_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
hash_to_bytes
(
content
.
sha1_git
)
)
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
(
content_metadata
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
)
assert
result
.
next_page_token
is
None
assert
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,))
==
list
(
content_metadata
)
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_objects
=
[
(
"metadata_authority"
,
authority
),
(
"metadata_fetcher"
,
fetcher
),
]
+
[(
"raw_extrinsic_metadata"
,
item
)
for
item
in
content_metadata
]
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_content_metadata_add_duplicate
(
self
,
swh_storage
,
sample_data
):
"""Duplicates should be silently updated."""
content
=
sample_data
.
content
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
content_metadata
,
content_metadata2
=
sample_data
.
content_metadata
[:
2
]
content_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
hash_to_bytes
(
content
.
sha1_git
)
)
new_content_metadata2
=
attr
.
evolve
(
content_metadata2
,
format
=
"new-format"
,
metadata
=
b
"new-metadata"
,
)
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
content_metadata
,
content_metadata2
])
swh_storage
.
raw_extrinsic_metadata_add
([
new_content_metadata2
])
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
)
assert
result
.
next_page_token
is
None
expected_results1
=
(
content_metadata
,
new_content_metadata2
)
expected_results2
=
(
content_metadata
,
content_metadata2
)
assert
tuple
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,))
in
(
expected_results1
,
# cassandra
expected_results2
,
# postgresql
)
def
test_content_metadata_get
(
self
,
swh_storage
,
sample_data
):
content
,
content2
=
sample_data
.
contents
[:
2
]
fetcher
,
fetcher2
=
sample_data
.
fetchers
[:
2
]
authority
,
authority2
=
sample_data
.
authorities
[:
2
]
(
content1_metadata1
,
content1_metadata2
,
content1_metadata3
,
)
=
sample_data
.
content_metadata
[:
3
]
content1_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
content
.
sha1_git
)
content2_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
content2
.
sha1_git
)
content2_metadata
=
attr
.
evolve
(
content1_metadata2
,
target
=
content2_swhid
)
swh_storage
.
metadata_authority_add
([
authority
,
authority2
])
swh_storage
.
metadata_fetcher_add
([
fetcher
,
fetcher2
])
swh_storage
.
raw_extrinsic_metadata_add
(
[
content1_metadata1
,
content1_metadata2
,
content1_metadata3
,
content2_metadata
,
]
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content1_swhid
,
authority
)
assert
result
.
next_page_token
is
None
assert
[
content1_metadata1
,
content1_metadata2
]
==
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,)
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content1_swhid
,
authority2
)
assert
result
.
next_page_token
is
None
assert
[
content1_metadata3
]
==
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,)
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content2_swhid
,
authority
)
assert
result
.
next_page_token
is
None
assert
[
content2_metadata
]
==
list
(
result
.
results
,)
def
test_content_metadata_get_after
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
content_metadata
,
content_metadata2
=
sample_data
.
content_metadata
[:
2
]
content_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
content
.
sha1_git
)
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
content_metadata
,
content_metadata2
])
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
after
=
content_metadata
.
discovery_date
-
timedelta
(
seconds
=
1
),
)
assert
result
.
next_page_token
is
None
assert
[
content_metadata
,
content_metadata2
]
==
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,)
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
after
=
content_metadata
.
discovery_date
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[
content_metadata2
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
after
=
content_metadata2
.
discovery_date
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[]
def
test_content_metadata_get_paginate
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
content_metadata
,
content_metadata2
=
sample_data
.
content_metadata
[:
2
]
content_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
content
.
sha1_git
)
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
content_metadata
,
content_metadata2
])
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
limit
=
1
)
assert
result
.
next_page_token
is
not
None
assert
result
.
results
==
[
content_metadata
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
limit
=
1
,
page_token
=
result
.
next_page_token
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[
content_metadata2
]
def
test_content_metadata_get_paginate_same_date
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
fetcher1
,
fetcher2
=
sample_data
.
fetchers
[:
2
]
authority
=
sample_data
.
metadata_authority
content_metadata
,
content_metadata2
=
sample_data
.
content_metadata
[:
2
]
content_swhid
=
SWHID
(
object_type
=
"content"
,
object_id
=
content
.
sha1_git
)
swh_storage
.
metadata_fetcher_add
([
fetcher1
,
fetcher2
])
swh_storage
.
metadata_authority_add
([
authority
])
new_content_metadata2
=
attr
.
evolve
(
content_metadata2
,
discovery_date
=
content_metadata2
.
discovery_date
,
fetcher
=
attr
.
evolve
(
fetcher2
,
metadata
=
None
),
)
swh_storage
.
raw_extrinsic_metadata_add
(
[
content_metadata
,
new_content_metadata2
]
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
limit
=
1
)
assert
result
.
next_page_token
is
not
None
assert
result
.
results
==
[
content_metadata
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
content_swhid
,
authority
,
limit
=
1
,
page_token
=
result
.
next_page_token
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[
new_content_metadata2
]
def
test_content_metadata_get__invalid_id
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
content_metadata
,
content_metadata2
=
sample_data
.
content_metadata
[:
2
]
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
content_metadata
,
content_metadata2
])
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"SWHID"
):
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
CONTENT
,
origin
.
url
,
authority
)
def
test_origin_metadata_add
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
)
assert
result
.
next_page_token
is
None
assert
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
))
==
[
origin_metadata
,
origin_metadata2
,
]
actual_objects
=
list
(
swh_storage
.
journal_writer
.
journal
.
objects
)
expected_objects
=
[
(
"metadata_authority"
,
authority
),
(
"metadata_fetcher"
,
fetcher
),
(
"raw_extrinsic_metadata"
,
origin_metadata
),
(
"raw_extrinsic_metadata"
,
origin_metadata2
),
]
for
obj
in
expected_objects
:
assert
obj
in
actual_objects
def
test_origin_metadata_add_duplicate
(
self
,
swh_storage
,
sample_data
):
"""Duplicates should be silently updated."""
origin
=
sample_data
.
origin
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
new_origin_metadata2
=
attr
.
evolve
(
origin_metadata2
,
format
=
"new-format"
,
metadata
=
b
"new-metadata"
,
)
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
swh_storage
.
raw_extrinsic_metadata_add
([
new_origin_metadata2
])
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
)
assert
result
.
next_page_token
is
None
# which of the two behavior happens is backend-specific.
expected_results1
=
(
origin_metadata
,
new_origin_metadata2
)
expected_results2
=
(
origin_metadata
,
origin_metadata2
)
assert
tuple
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,))
in
(
expected_results1
,
# cassandra
expected_results2
,
# postgresql
)
def
test_origin_metadata_get
(
self
,
swh_storage
,
sample_data
):
origin
,
origin2
=
sample_data
.
origins
[:
2
]
fetcher
,
fetcher2
=
sample_data
.
fetchers
[:
2
]
authority
,
authority2
=
sample_data
.
authorities
[:
2
]
(
origin1_metadata1
,
origin1_metadata2
,
origin1_metadata3
,
)
=
sample_data
.
origin_metadata
[:
3
]
assert
swh_storage
.
origin_add
([
origin
,
origin2
])
==
{
"origin:add"
:
2
}
origin2_metadata
=
attr
.
evolve
(
origin1_metadata2
,
target
=
origin2
.
url
)
swh_storage
.
metadata_authority_add
([
authority
,
authority2
])
swh_storage
.
metadata_fetcher_add
([
fetcher
,
fetcher2
])
swh_storage
.
raw_extrinsic_metadata_add
(
[
origin1_metadata1
,
origin1_metadata2
,
origin1_metadata3
,
origin2_metadata
]
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
)
assert
result
.
next_page_token
is
None
assert
[
origin1_metadata1
,
origin1_metadata2
]
==
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,)
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority2
)
assert
result
.
next_page_token
is
None
assert
[
origin1_metadata3
]
==
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,)
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin2
.
url
,
authority
)
assert
result
.
next_page_token
is
None
assert
[
origin2_metadata
]
==
list
(
result
.
results
,)
def
test_origin_metadata_get_after
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
after
=
origin_metadata
.
discovery_date
-
timedelta
(
seconds
=
1
),
)
assert
result
.
next_page_token
is
None
assert
list
(
sorted
(
result
.
results
,
key
=
lambda
x
:
x
.
discovery_date
,))
==
[
origin_metadata
,
origin_metadata2
,
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
after
=
origin_metadata
.
discovery_date
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[
origin_metadata2
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
after
=
origin_metadata2
.
discovery_date
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[]
def
test_origin_metadata_get_paginate
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
fetcher
=
sample_data
.
metadata_fetcher
authority
=
sample_data
.
metadata_authority
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
)
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
limit
=
1
)
assert
result
.
next_page_token
is
not
None
assert
result
.
results
==
[
origin_metadata
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
limit
=
1
,
page_token
=
result
.
next_page_token
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[
origin_metadata2
]
def
test_origin_metadata_get_paginate_same_date
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
fetcher1
,
fetcher2
=
sample_data
.
fetchers
[:
2
]
authority
=
sample_data
.
metadata_authority
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_fetcher_add
([
fetcher1
,
fetcher2
])
swh_storage
.
metadata_authority_add
([
authority
])
new_origin_metadata2
=
attr
.
evolve
(
origin_metadata2
,
discovery_date
=
origin_metadata2
.
discovery_date
,
fetcher
=
attr
.
evolve
(
fetcher2
,
metadata
=
None
),
)
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
new_origin_metadata2
])
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
limit
=
1
)
assert
result
.
next_page_token
is
not
None
assert
result
.
results
==
[
origin_metadata
]
result
=
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
origin
.
url
,
authority
,
limit
=
1
,
page_token
=
result
.
next_page_token
,
)
assert
result
.
next_page_token
is
None
assert
result
.
results
==
[
new_origin_metadata2
]
def
test_origin_metadata_add_missing_authority
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
fetcher
=
sample_data
.
metadata_fetcher
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_fetcher_add
([
fetcher
])
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"authority"
):
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
def
test_origin_metadata_add_missing_fetcher
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
authority
=
sample_data
.
metadata_authority
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_authority_add
([
authority
])
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"fetcher"
):
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
def
test_origin_metadata_get__invalid_id_type
(
self
,
swh_storage
,
sample_data
):
origin
=
sample_data
.
origin
authority
=
sample_data
.
metadata_authority
fetcher
=
sample_data
.
metadata_fetcher
origin_metadata
,
origin_metadata2
=
sample_data
.
origin_metadata
[:
2
]
content_metadata
=
sample_data
.
content_metadata
[
0
]
assert
swh_storage
.
origin_add
([
origin
])
==
{
"origin:add"
:
1
}
swh_storage
.
metadata_fetcher_add
([
fetcher
])
swh_storage
.
metadata_authority_add
([
authority
])
swh_storage
.
raw_extrinsic_metadata_add
([
origin_metadata
,
origin_metadata2
])
with
pytest
.
raises
(
StorageArgumentException
,
match
=
"SWHID"
):
swh_storage
.
raw_extrinsic_metadata_get
(
MetadataTargetType
.
ORIGIN
,
content_metadata
.
target
,
authority
,
)
class
TestStorageGeneratedData
:
def
test_generate_content_get_data
(
self
,
swh_storage
,
swh_contents
):
contents_with_data
=
[
c
for
c
in
swh_contents
if
c
.
status
!=
"absent"
]
# retrieve contents
for
content
in
contents_with_data
:
actual_content_data
=
swh_storage
.
content_get_data
(
content
.
sha1
)
assert
actual_content_data
is
not
None
assert
actual_content_data
==
content
.
data
def
test_generate_content_get
(
self
,
swh_storage
,
swh_contents
):
expected_contents
=
[
attr
.
evolve
(
c
,
data
=
None
)
for
c
in
swh_contents
if
c
.
status
!=
"absent"
]
actual_contents
=
swh_storage
.
content_get
([
c
.
sha1
for
c
in
expected_contents
])
assert
len
(
actual_contents
)
==
len
(
expected_contents
)
assert
actual_contents
==
expected_contents
@pytest.mark.parametrize
(
"limit"
,
[
1
,
7
,
10
,
100
,
1000
])
def
test_origin_list
(
self
,
swh_storage
,
swh_origins
,
limit
):
returned_origins
=
[]
page_token
=
None
i
=
0
while
True
:
actual_page
=
swh_storage
.
origin_list
(
page_token
=
page_token
,
limit
=
limit
)
assert
len
(
actual_page
.
results
)
<=
limit
returned_origins
.
extend
(
actual_page
.
results
)
i
+=
1
page_token
=
actual_page
.
next_page_token
if
page_token
is
None
:
assert
i
*
limit
>=
len
(
swh_origins
)
break
else
:
assert
len
(
actual_page
.
results
)
==
limit
assert
sorted
(
returned_origins
)
==
sorted
(
swh_origins
)
def
test_origin_count
(
self
,
swh_storage
,
sample_data
):
swh_storage
.
origin_add
(
sample_data
.
origins
)
assert
swh_storage
.
origin_count
(
"github"
)
==
3
assert
swh_storage
.
origin_count
(
"gitlab"
)
==
2
assert
swh_storage
.
origin_count
(
".*user.*"
,
regexp
=
True
)
==
5
assert
swh_storage
.
origin_count
(
".*user.*"
,
regexp
=
False
)
==
0
assert
swh_storage
.
origin_count
(
".*user1.*"
,
regexp
=
True
)
==
2
assert
swh_storage
.
origin_count
(
".*user1.*"
,
regexp
=
False
)
==
0
def
test_origin_count_with_visit_no_visits
(
self
,
swh_storage
,
sample_data
):
swh_storage
.
origin_add
(
sample_data
.
origins
)
# none of them have visits, so with_visit=True => 0
assert
swh_storage
.
origin_count
(
"github"
,
with_visit
=
True
)
==
0
assert
swh_storage
.
origin_count
(
"gitlab"
,
with_visit
=
True
)
==
0
assert
swh_storage
.
origin_count
(
".*user.*"
,
regexp
=
True
,
with_visit
=
True
)
==
0
assert
swh_storage
.
origin_count
(
".*user.*"
,
regexp
=
False
,
with_visit
=
True
)
==
0
assert
swh_storage
.
origin_count
(
".*user1.*"
,
regexp
=
True
,
with_visit
=
True
)
==
0
assert
swh_storage
.
origin_count
(
".*user1.*"
,
regexp
=
False
,
with_visit
=
True
)
==
0
def
test_origin_count_with_visit_with_visits_no_snapshot
(
self
,
swh_storage
,
sample_data
):
swh_storage
.
origin_add
(
sample_data
.
origins
)
origin_url
=
"https://github.com/user1/repo1"
visit
=
OriginVisit
(
origin
=
origin_url
,
date
=
now
(),
type
=
"git"
,)
swh_storage
.
origin_visit_add
([
visit
])
assert
swh_storage
.
origin_count
(
"github"
,
with_visit
=
False
)
==
3
# it has a visit, but no snapshot, so with_visit=True => 0
assert
swh_storage
.
origin_count
(
"github"
,
with_visit
=
True
)
==
0
assert
swh_storage
.
origin_count
(
"gitlab"
,
with_visit
=
False
)
==
2
# these gitlab origins have no visit
assert
swh_storage
.
origin_count
(
"gitlab"
,
with_visit
=
True
)
==
0
assert
(
swh_storage
.
origin_count
(
"github.*user1"
,
regexp
=
True
,
with_visit
=
False
)
==
1
)
assert
(
swh_storage
.
origin_count
(
"github.*user1"
,
regexp
=
True
,
with_visit
=
True
)
==
0
)
assert
swh_storage
.
origin_count
(
"github"
,
regexp
=
True
,
with_visit
=
True
)
==
0
def
test_origin_count_with_visit_with_visits_and_snapshot
(
self
,
swh_storage
,
sample_data
):
snapshot
=
sample_data
.
snapshot
swh_storage
.
origin_add
(
sample_data
.
origins
)
swh_storage
.
snapshot_add
([
snapshot
])
origin_url
=
"https://github.com/user1/repo1"
visit
=
OriginVisit
(
origin
=
origin_url
,
date
=
now
(),
type
=
"git"
,)
visit
=
swh_storage
.
origin_visit_add
([
visit
])[
0
]
swh_storage
.
origin_visit_status_add
(
[
OriginVisitStatus
(
origin
=
origin_url
,
visit
=
visit
.
visit
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
snapshot
.
id
,
)
]
)
assert
swh_storage
.
origin_count
(
"github"
,
with_visit
=
False
)
==
3
# github/user1 has a visit and a snapshot, so with_visit=True => 1
assert
swh_storage
.
origin_count
(
"github"
,
with_visit
=
True
)
==
1
assert
(
swh_storage
.
origin_count
(
"github.*user1"
,
regexp
=
True
,
with_visit
=
False
)
==
1
)
assert
(
swh_storage
.
origin_count
(
"github.*user1"
,
regexp
=
True
,
with_visit
=
True
)
==
1
)
assert
swh_storage
.
origin_count
(
"github"
,
regexp
=
True
,
with_visit
=
True
)
==
1
@settings
(
suppress_health_check
=
[
HealthCheck
.
too_slow
])
@given
(
strategies
.
lists
(
objects
(
split_content
=
True
),
max_size
=
2
))
def
test_add_arbitrary
(
self
,
swh_storage
,
objects
):
for
(
obj_type
,
obj
)
in
objects
:
if
obj
.
object_type
==
"origin_visit"
:
swh_storage
.
origin_add
([
Origin
(
url
=
obj
.
origin
)])
visit
=
OriginVisit
(
origin
=
obj
.
origin
,
date
=
obj
.
date
,
type
=
obj
.
type
,)
swh_storage
.
origin_visit_add
([
visit
])
else
:
method
=
getattr
(
swh_storage
,
obj_type
+
"_add"
)
try
:
method
([
obj
])
except
HashCollision
:
pass
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 7:07 PM (10 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3268340
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment