Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8393680
test_postgresql.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
test_postgresql.py
View Options
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
contextlib
import
contextmanager
import
queue
import
threading
from
unittest.mock
import
Mock
import
attr
import
pytest
from
swh.storage.postgresql.db
import
Db
from
swh.storage.tests.storage_tests
import
TestStorage
# noqa
from
swh.storage.tests.storage_tests
import
TestStorageGeneratedData
# noqa
from
swh.storage.utils
import
now
@contextmanager
def
db_transaction
(
storage
):
with
storage
.
db
()
as
db
:
with
db
.
transaction
()
as
cur
:
yield
db
,
cur
@pytest.mark.db
class
TestLocalStorage
:
"""Test the local storage"""
# This test is only relevant on the local storage, with an actual
# objstorage raising an exception
def
test_content_add_objstorage_exception
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
swh_storage
.
objstorage
.
content_add
=
Mock
(
side_effect
=
Exception
(
"mocked broken objstorage"
)
)
with
pytest
.
raises
(
Exception
,
match
=
"mocked broken"
):
swh_storage
.
content_add
([
content
])
missing
=
list
(
swh_storage
.
content_missing
([
content
.
hashes
()]))
assert
missing
==
[
content
.
sha1
]
@pytest.mark.db
class
TestStorageRaceConditions
:
@pytest.mark.xfail
def
test_content_add_race
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
results
=
queue
.
Queue
()
def
thread
():
try
:
with
db_transaction
(
swh_storage
)
as
(
db
,
cur
):
ret
=
swh_storage
.
content_add
([
content
],
db
=
db
,
cur
=
cur
)
results
.
put
((
threading
.
get_ident
(),
"data"
,
ret
))
except
Exception
as
e
:
results
.
put
((
threading
.
get_ident
(),
"exc"
,
e
))
t1
=
threading
.
Thread
(
target
=
thread
)
t2
=
threading
.
Thread
(
target
=
thread
)
t1
.
start
()
# this avoids the race condition
# import time
# time.sleep(1)
t2
.
start
()
t1
.
join
()
t2
.
join
()
r1
=
results
.
get
(
block
=
False
)
r2
=
results
.
get
(
block
=
False
)
with
pytest
.
raises
(
queue
.
Empty
):
results
.
get
(
block
=
False
)
assert
r1
[
0
]
!=
r2
[
0
]
assert
r1
[
1
]
==
"data"
,
"Got exception
%r
in Thread
%s
"
%
(
r1
[
2
],
r1
[
0
])
assert
r2
[
1
]
==
"data"
,
"Got exception
%r
in Thread
%s
"
%
(
r2
[
2
],
r2
[
0
])
@pytest.mark.db
class
TestPgStorage
:
"""This class is dedicated for the rare case where the schema needs to
be altered dynamically.
Otherwise, the tests could be blocking when ran altogether.
"""
def
test_content_update_with_new_cols
(
self
,
swh_storage
,
sample_data
):
content
,
content2
=
sample_data
.
contents
[:
2
]
swh_storage
.
journal_writer
.
journal
=
None
# TODO, not supported
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"""alter table content
add column test text default null,
add column test2 text default null"""
)
swh_storage
.
content_add
([
content
])
cont
=
content
.
to_dict
()
cont
[
"test"
]
=
"value-1"
cont
[
"test2"
]
=
"value-2"
swh_storage
.
content_update
([
cont
],
keys
=
[
"test"
,
"test2"
])
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"""SELECT sha1, sha1_git, sha256, length, status,
test, test2
FROM content WHERE sha1 = %s"""
,
(
cont
[
"sha1"
],),
)
datum
=
cur
.
fetchone
()
assert
datum
==
(
cont
[
"sha1"
],
cont
[
"sha1_git"
],
cont
[
"sha256"
],
cont
[
"length"
],
"visible"
,
cont
[
"test"
],
cont
[
"test2"
],
)
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"""alter table content drop column test,
drop column test2"""
)
def
test_content_add_db
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
actual_result
=
swh_storage
.
content_add
([
content
])
assert
actual_result
==
{
"content:add"
:
1
,
"content:add:bytes"
:
content
.
length
,
}
if
hasattr
(
swh_storage
,
"objstorage"
):
assert
content
.
sha1
in
swh_storage
.
objstorage
.
objstorage
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"SELECT sha1, sha1_git, sha256, length, status"
" FROM content WHERE sha1 =
%s
"
,
(
content
.
sha1
,),
)
datum
=
cur
.
fetchone
()
assert
datum
==
(
content
.
sha1
,
content
.
sha1_git
,
content
.
sha256
,
content
.
length
,
"visible"
,
)
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
assert
contents
[
0
]
==
attr
.
evolve
(
content
,
data
=
None
)
def
test_content_add_metadata_db
(
self
,
swh_storage
,
sample_data
):
content
=
attr
.
evolve
(
sample_data
.
content
,
data
=
None
,
ctime
=
now
())
actual_result
=
swh_storage
.
content_add_metadata
([
content
])
assert
actual_result
==
{
"content:add"
:
1
,
}
if
hasattr
(
swh_storage
,
"objstorage"
):
assert
content
.
sha1
not
in
swh_storage
.
objstorage
.
objstorage
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"SELECT sha1, sha1_git, sha256, length, status"
" FROM content WHERE sha1 =
%s
"
,
(
content
.
sha1
,),
)
datum
=
cur
.
fetchone
()
assert
datum
==
(
content
.
sha1
,
content
.
sha1_git
,
content
.
sha256
,
content
.
length
,
"visible"
,
)
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
assert
contents
[
0
]
==
content
def
test_skipped_content_add_db
(
self
,
swh_storage
,
sample_data
):
content
,
cont2
=
sample_data
.
skipped_contents
[:
2
]
content2
=
attr
.
evolve
(
cont2
,
blake2s256
=
None
)
actual_result
=
swh_storage
.
skipped_content_add
([
content
,
content
,
content2
])
assert
2
<=
actual_result
.
pop
(
"skipped_content:add"
)
<=
3
assert
actual_result
==
{}
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"SELECT sha1, sha1_git, sha256, blake2s256, "
"length, status, reason "
"FROM skipped_content ORDER BY sha1_git"
)
dbdata
=
cur
.
fetchall
()
assert
len
(
dbdata
)
==
2
assert
dbdata
[
0
]
==
(
content
.
sha1
,
content
.
sha1_git
,
content
.
sha256
,
content
.
blake2s256
,
content
.
length
,
"absent"
,
"Content too long"
,
)
assert
dbdata
[
1
]
==
(
content2
.
sha1
,
content2
.
sha1_git
,
content2
.
sha256
,
content2
.
blake2s256
,
content2
.
length
,
"absent"
,
"Content too long"
,
)
def
test_clear_buffers
(
self
,
swh_storage
):
"""Calling clear buffers on real storage does nothing
"""
assert
swh_storage
.
clear_buffers
()
is
None
def
test_flush
(
self
,
swh_storage
):
"""Calling clear buffers on real storage does nothing
"""
assert
swh_storage
.
flush
()
==
{}
def
test_dbversion
(
self
,
swh_storage
):
with
swh_storage
.
db
()
as
db
:
assert
db
.
check_dbversion
()
def
test_dbversion_mismatch
(
self
,
swh_storage
,
monkeypatch
):
monkeypatch
.
setattr
(
Db
,
"current_version"
,
-
1
)
with
swh_storage
.
db
()
as
db
:
assert
db
.
check_dbversion
()
is
False
def
test_check_config
(
self
,
swh_storage
):
assert
swh_storage
.
check_config
(
check_write
=
True
)
assert
swh_storage
.
check_config
(
check_write
=
False
)
def
test_check_config_dbversion
(
self
,
swh_storage
,
monkeypatch
):
monkeypatch
.
setattr
(
Db
,
"current_version"
,
-
1
)
assert
swh_storage
.
check_config
(
check_write
=
True
)
is
False
assert
swh_storage
.
check_config
(
check_write
=
False
)
is
False
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Wed, Jun 4, 7:16 PM (5 d, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3334157
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment