Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9336754
test_postgresql.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
test_postgresql.py
View Options
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
contextlib
import
contextmanager
import
queue
import
threading
from
unittest.mock
import
Mock
import
attr
import
pytest
from
swh.model.model
import
Person
from
swh.storage.postgresql.db
import
Db
from
swh.storage.tests.storage_tests
import
TestStorage
as
_TestStorage
from
swh.storage.tests.storage_tests
import
TestStorageGeneratedData
# noqa
from
swh.storage.utils
import
now
@contextmanager
def
db_transaction
(
storage
):
with
storage
.
db
()
as
db
:
with
db
.
transaction
()
as
cur
:
yield
db
,
cur
class
TestStorage
(
_TestStorage
):
@pytest.mark.skip
(
"Directory pagination is not implemented in the postgresql backend yet."
)
def
test_directory_get_entries_pagination
(
self
):
pass
@pytest.mark.db
class
TestLocalStorage
:
"""Test the local storage"""
# This test is only relevant on the local storage, with an actual
# objstorage raising an exception
def
test_content_add_objstorage_exception
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
swh_storage
.
objstorage
.
content_add
=
Mock
(
side_effect
=
Exception
(
"mocked broken objstorage"
)
)
with
pytest
.
raises
(
Exception
,
match
=
"mocked broken"
):
swh_storage
.
content_add
([
content
])
missing
=
list
(
swh_storage
.
content_missing
([
content
.
hashes
()]))
assert
missing
==
[
content
.
sha1
]
@pytest.mark.db
class
TestStorageRaceConditions
:
@pytest.mark.xfail
def
test_content_add_race
(
self
,
swh_storage
,
sample_data
):
content
=
attr
.
evolve
(
sample_data
.
content
,
ctime
=
now
())
results
=
queue
.
Queue
()
def
thread
():
try
:
with
db_transaction
(
swh_storage
)
as
(
db
,
cur
):
ret
=
swh_storage
.
_content_add_metadata
(
db
,
cur
,
[
content
])
results
.
put
((
threading
.
get_ident
(),
"data"
,
ret
))
except
Exception
as
e
:
results
.
put
((
threading
.
get_ident
(),
"exc"
,
e
))
t1
=
threading
.
Thread
(
target
=
thread
)
t2
=
threading
.
Thread
(
target
=
thread
)
t1
.
start
()
# this avoids the race condition
# import time
# time.sleep(1)
t2
.
start
()
t1
.
join
()
t2
.
join
()
r1
=
results
.
get
(
block
=
False
)
r2
=
results
.
get
(
block
=
False
)
with
pytest
.
raises
(
queue
.
Empty
):
results
.
get
(
block
=
False
)
assert
r1
[
0
]
!=
r2
[
0
]
assert
r1
[
1
]
==
"data"
,
"Got exception
%r
in Thread
%s
"
%
(
r1
[
2
],
r1
[
0
])
assert
r2
[
1
]
==
"data"
,
"Got exception
%r
in Thread
%s
"
%
(
r2
[
2
],
r2
[
0
])
@pytest.mark.db
class
TestPgStorage
:
"""This class is dedicated for the rare case where the schema needs to
be altered dynamically.
Otherwise, the tests could be blocking when ran altogether.
"""
def
test_content_update_with_new_cols
(
self
,
swh_storage
,
sample_data
):
content
,
content2
=
sample_data
.
contents
[:
2
]
swh_storage
.
journal_writer
.
journal
=
None
# TODO, not supported
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"""alter table content
add column test text default null,
add column test2 text default null"""
)
swh_storage
.
content_add
([
content
])
cont
=
content
.
to_dict
()
cont
[
"test"
]
=
"value-1"
cont
[
"test2"
]
=
"value-2"
swh_storage
.
content_update
([
cont
],
keys
=
[
"test"
,
"test2"
])
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"""SELECT sha1, sha1_git, sha256, length, status,
test, test2
FROM content WHERE sha1 = %s"""
,
(
cont
[
"sha1"
],),
)
datum
=
cur
.
fetchone
()
assert
datum
==
(
cont
[
"sha1"
],
cont
[
"sha1_git"
],
cont
[
"sha256"
],
cont
[
"length"
],
"visible"
,
cont
[
"test"
],
cont
[
"test2"
],
)
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"""alter table content drop column test,
drop column test2"""
)
def
test_content_add_db
(
self
,
swh_storage
,
sample_data
):
content
=
sample_data
.
content
actual_result
=
swh_storage
.
content_add
([
content
])
assert
actual_result
==
{
"content:add"
:
1
,
"content:add:bytes"
:
content
.
length
,
}
if
hasattr
(
swh_storage
,
"objstorage"
):
assert
content
.
sha1
in
swh_storage
.
objstorage
.
objstorage
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"SELECT sha1, sha1_git, sha256, length, status"
" FROM content WHERE sha1 =
%s
"
,
(
content
.
sha1
,),
)
datum
=
cur
.
fetchone
()
assert
datum
==
(
content
.
sha1
,
content
.
sha1_git
,
content
.
sha256
,
content
.
length
,
"visible"
,
)
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
assert
contents
[
0
]
==
attr
.
evolve
(
content
,
data
=
None
)
def
test_content_add_metadata_db
(
self
,
swh_storage
,
sample_data
):
content
=
attr
.
evolve
(
sample_data
.
content
,
data
=
None
,
ctime
=
now
())
actual_result
=
swh_storage
.
content_add_metadata
([
content
])
assert
actual_result
==
{
"content:add"
:
1
,
}
if
hasattr
(
swh_storage
,
"objstorage"
):
assert
content
.
sha1
not
in
swh_storage
.
objstorage
.
objstorage
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"SELECT sha1, sha1_git, sha256, length, status"
" FROM content WHERE sha1 =
%s
"
,
(
content
.
sha1
,),
)
datum
=
cur
.
fetchone
()
assert
datum
==
(
content
.
sha1
,
content
.
sha1_git
,
content
.
sha256
,
content
.
length
,
"visible"
,
)
contents
=
[
obj
for
(
obj_type
,
obj
)
in
swh_storage
.
journal_writer
.
journal
.
objects
if
obj_type
==
"content"
]
assert
len
(
contents
)
==
1
assert
contents
[
0
]
==
content
def
test_skipped_content_add_db
(
self
,
swh_storage
,
sample_data
):
content
,
cont2
=
sample_data
.
skipped_contents
[:
2
]
content2
=
attr
.
evolve
(
cont2
,
blake2s256
=
None
)
actual_result
=
swh_storage
.
skipped_content_add
([
content
,
content
,
content2
])
assert
2
<=
actual_result
.
pop
(
"skipped_content:add"
)
<=
3
assert
actual_result
==
{}
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"SELECT sha1, sha1_git, sha256, blake2s256, "
"length, status, reason "
"FROM skipped_content ORDER BY sha1_git"
)
dbdata
=
cur
.
fetchall
()
assert
len
(
dbdata
)
==
2
assert
dbdata
[
0
]
==
(
content
.
sha1
,
content
.
sha1_git
,
content
.
sha256
,
content
.
blake2s256
,
content
.
length
,
"absent"
,
"Content too long"
,
)
assert
dbdata
[
1
]
==
(
content2
.
sha1
,
content2
.
sha1_git
,
content2
.
sha256
,
content2
.
blake2s256
,
content2
.
length
,
"absent"
,
"Content too long"
,
)
def
test_revision_get_displayname_behavior
(
self
,
swh_storage
,
sample_data
):
"""Check revision_get behavior when displayname is set"""
revision
,
revision2
=
sample_data
.
revisions
[:
2
]
# Make authors and committers known
revision
=
attr
.
evolve
(
revision
,
author
=
Person
.
from_fullname
(
b
"author1 <author1@example.com>"
),
committer
=
Person
.
from_fullname
(
b
"committer1 <committer1@example.com>"
),
)
revision
=
attr
.
evolve
(
revision
,
id
=
revision
.
compute_hash
())
revision2
=
attr
.
evolve
(
revision2
,
author
=
Person
.
from_fullname
(
b
"author2 <author2@example.com>"
),
committer
=
Person
.
from_fullname
(
b
"committer2 <committer2@example.com>"
),
)
revision2
=
attr
.
evolve
(
revision2
,
id
=
revision2
.
compute_hash
())
add_result
=
swh_storage
.
revision_add
([
revision
,
revision2
])
assert
add_result
==
{
"revision:add"
:
2
}
# Before displayname change
revisions
=
swh_storage
.
revision_get
([
revision
.
id
,
revision2
.
id
])
assert
revisions
==
[
revision
,
revision2
]
displayname
=
b
"Display Name <displayname@example.com>"
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"UPDATE person set displayname =
%s
where fullname =
%s
"
,
(
displayname
,
revision
.
author
.
fullname
),
)
revisions
=
swh_storage
.
revision_get
([
revision
.
id
,
revision2
.
id
])
assert
revisions
==
[
attr
.
evolve
(
revision
,
author
=
Person
.
from_fullname
(
displayname
)),
revision2
,
]
revisions
=
swh_storage
.
revision_get
(
[
revision
.
id
,
revision2
.
id
],
ignore_displayname
=
True
)
assert
revisions
==
[
revision
,
revision2
]
def
test_revision_log_displayname_behavior
(
self
,
swh_storage
,
sample_data
):
"""Check revision_log behavior when displayname is set"""
revision
,
revision2
=
sample_data
.
revisions
[:
2
]
# Make authors, committers and parenthood relationship known
# (revision2 -[parent]-> revision1)
revision
=
attr
.
evolve
(
revision
,
author
=
Person
.
from_fullname
(
b
"author1 <author1@example.com>"
),
committer
=
Person
.
from_fullname
(
b
"committer1 <committer1@example.com>"
),
)
revision
=
attr
.
evolve
(
revision
,
id
=
revision
.
compute_hash
())
revision2
=
attr
.
evolve
(
revision2
,
parents
=
(
revision
.
id
,),
author
=
Person
.
from_fullname
(
b
"author2 <author2@example.com>"
),
committer
=
Person
.
from_fullname
(
b
"committer2 <committer2@example.com>"
),
)
revision2
=
attr
.
evolve
(
revision2
,
id
=
revision2
.
compute_hash
())
add_result
=
swh_storage
.
revision_add
([
revision
,
revision2
])
assert
add_result
==
{
"revision:add"
:
2
}
# Before displayname change
revisions
=
swh_storage
.
revision_log
([
revision2
.
id
])
assert
list
(
revisions
)
==
[
revision2
.
to_dict
(),
revision
.
to_dict
()]
displayname
=
b
"Display Name <displayname@example.com>"
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"UPDATE person set displayname =
%s
where fullname =
%s
"
,
(
displayname
,
revision
.
author
.
fullname
),
)
revisions
=
swh_storage
.
revision_log
([
revision2
.
id
])
assert
list
(
revisions
)
==
[
revision2
.
to_dict
(),
attr
.
evolve
(
revision
,
author
=
Person
.
from_fullname
(
displayname
))
.
to_dict
(),
]
revisions
=
swh_storage
.
revision_log
([
revision2
.
id
],
ignore_displayname
=
True
)
assert
list
(
revisions
)
==
[
revision2
.
to_dict
(),
revision
.
to_dict
()]
def
test_release_get_displayname_behavior
(
self
,
swh_storage
,
sample_data
):
"""Check release_get behavior when displayname is set"""
release
,
release2
=
sample_data
.
releases
[:
2
]
# Make authors known
release
=
attr
.
evolve
(
release
,
author
=
Person
.
from_fullname
(
b
"author1 <author1@example.com>"
),
)
release
=
attr
.
evolve
(
release
,
id
=
release
.
compute_hash
())
release2
=
attr
.
evolve
(
release2
,
author
=
Person
.
from_fullname
(
b
"author2 <author2@example.com>"
),
)
release2
=
attr
.
evolve
(
release2
,
id
=
release2
.
compute_hash
())
add_result
=
swh_storage
.
release_add
([
release
,
release2
])
assert
add_result
==
{
"release:add"
:
2
}
# Before displayname change
releases
=
swh_storage
.
release_get
([
release
.
id
,
release2
.
id
])
assert
releases
==
[
release
,
release2
]
displayname
=
b
"Display Name <displayname@example.com>"
with
db_transaction
(
swh_storage
)
as
(
_
,
cur
):
cur
.
execute
(
"UPDATE person set displayname =
%s
where fullname =
%s
"
,
(
displayname
,
release
.
author
.
fullname
),
)
releases
=
swh_storage
.
release_get
([
release
.
id
,
release2
.
id
])
assert
releases
==
[
attr
.
evolve
(
release
,
author
=
Person
.
from_fullname
(
displayname
)),
release2
,
]
releases
=
swh_storage
.
release_get
(
[
release
.
id
,
release2
.
id
],
ignore_displayname
=
True
)
assert
releases
==
[
release
,
release2
]
def
test_clear_buffers
(
self
,
swh_storage
):
"""Calling clear buffers on real storage does nothing"""
assert
swh_storage
.
clear_buffers
()
is
None
def
test_flush
(
self
,
swh_storage
):
"""Calling clear buffers on real storage does nothing"""
assert
swh_storage
.
flush
()
==
{}
def
test_dbversion
(
self
,
swh_storage
):
with
swh_storage
.
db
()
as
db
:
assert
db
.
check_dbversion
()
def
test_dbversion_mismatch
(
self
,
swh_storage
,
monkeypatch
):
monkeypatch
.
setattr
(
Db
,
"current_version"
,
-
1
)
with
swh_storage
.
db
()
as
db
:
assert
db
.
check_dbversion
()
is
False
def
test_check_config
(
self
,
swh_storage
):
assert
swh_storage
.
check_config
(
check_write
=
True
)
assert
swh_storage
.
check_config
(
check_write
=
False
)
def
test_check_config_dbversion
(
self
,
swh_storage
,
monkeypatch
):
monkeypatch
.
setattr
(
Db
,
"current_version"
,
-
1
)
assert
swh_storage
.
check_config
(
check_write
=
True
)
is
False
assert
swh_storage
.
check_config
(
check_write
=
False
)
is
False
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 7:42 AM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3264240
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment