Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339066
test_to_disk.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
test_to_disk.py
View Options
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
pytest
from
swh.model.from_disk
import
DentryPerms
from
swh.model.model
import
Content
,
Directory
,
DirectoryEntry
,
SkippedContent
from
swh.vault.to_disk
import
DirectoryBuilder
,
get_filtered_files_content
def
test_get_filtered_files_content
(
swh_storage
):
content
=
Content
.
from_data
(
b
"foo bar"
)
skipped_content
=
SkippedContent
(
sha1
=
None
,
sha1_git
=
b
"c"
*
20
,
sha256
=
None
,
blake2s256
=
None
,
length
=
42
,
status
=
"absent"
,
reason
=
"for some reason"
,
)
swh_storage
.
content_add
([
content
])
swh_storage
.
skipped_content_add
([
skipped_content
])
files_data
=
[
{
"status"
:
"visible"
,
"sha1"
:
content
.
sha1
,
"sha1_git"
:
content
.
sha1_git
,
"target"
:
content
.
sha1_git
,
},
{
"status"
:
"absent"
,
"target"
:
skipped_content
.
sha1_git
,
},
]
res
=
list
(
get_filtered_files_content
(
swh_storage
,
files_data
))
assert
res
==
[
{
"content"
:
content
.
data
,
"status"
:
"visible"
,
"sha1"
:
content
.
sha1
,
"sha1_git"
:
content
.
sha1_git
,
"target"
:
content
.
sha1_git
,
},
{
"content"
:
(
b
"This content has not been retrieved in the "
b
"Software Heritage archive due to its size."
),
"status"
:
"absent"
,
"target"
:
skipped_content
.
sha1_git
,
},
]
def
test_get_filtered_files_content__unknown_status
(
swh_storage
):
content
=
Content
.
from_data
(
b
"foo bar"
)
swh_storage
.
content_add
([
content
])
files_data
=
[
{
"status"
:
"visible"
,
"sha1"
:
content
.
sha1
,
"sha1_git"
:
content
.
sha1_git
,
"target"
:
content
.
sha1_git
,
},
{
"status"
:
"blah"
,
"target"
:
b
"c"
*
20
,
},
]
with
pytest
.
raises
(
AssertionError
,
match
=
"unexpected status 'blah'"
):
list
(
get_filtered_files_content
(
swh_storage
,
files_data
))
def
_fill_storage
(
swh_storage
,
exclude_cnt3
=
False
,
exclude_dir1
=
False
):
cnt1
=
Content
.
from_data
(
b
"foo bar"
)
cnt2
=
Content
.
from_data
(
b
"bar baz"
)
cnt3
=
Content
.
from_data
(
b
"baz qux"
)
dir1
=
Directory
(
entries
=
(
DirectoryEntry
(
name
=
b
"content1"
,
type
=
"file"
,
target
=
cnt1
.
sha1_git
,
perms
=
DentryPerms
.
content
,
),
DirectoryEntry
(
name
=
b
"content2"
,
type
=
"file"
,
target
=
cnt2
.
sha1_git
,
perms
=
DentryPerms
.
content
,
),
)
)
dir2
=
Directory
(
entries
=
(
DirectoryEntry
(
name
=
b
"content3"
,
type
=
"file"
,
target
=
cnt3
.
sha1_git
,
perms
=
DentryPerms
.
content
,
),
DirectoryEntry
(
name
=
b
"subdirectory"
,
type
=
"dir"
,
target
=
dir1
.
id
,
perms
=
DentryPerms
.
directory
,
),
)
)
if
exclude_cnt3
:
swh_storage
.
content_add
([
cnt1
,
cnt2
])
else
:
swh_storage
.
content_add
([
cnt1
,
cnt2
,
cnt3
])
if
exclude_dir1
:
swh_storage
.
directory_add
([
dir2
])
else
:
swh_storage
.
directory_add
([
dir1
,
dir2
])
return
dir2
def
test_directory_builder
(
swh_storage
,
tmp_path
):
dir2
=
_fill_storage
(
swh_storage
)
root
=
tmp_path
/
"root"
builder
=
DirectoryBuilder
(
swh_storage
,
bytes
(
root
),
dir2
.
id
)
assert
not
root
.
exists
()
builder
.
build
()
assert
root
.
is_dir
()
assert
set
(
root
.
glob
(
"**/*"
))
==
{
root
/
"subdirectory"
,
root
/
"subdirectory"
/
"content1"
,
root
/
"subdirectory"
/
"content2"
,
root
/
"content3"
,
}
assert
(
root
/
"subdirectory"
/
"content1"
)
.
open
()
.
read
()
==
"foo bar"
assert
(
root
/
"subdirectory"
/
"content2"
)
.
open
()
.
read
()
==
"bar baz"
assert
(
root
/
"content3"
)
.
open
()
.
read
()
==
"baz qux"
def
test_directory_builder_missing_content
(
swh_storage
,
tmp_path
):
dir2
=
_fill_storage
(
swh_storage
,
exclude_cnt3
=
True
)
root
=
tmp_path
/
"root"
builder
=
DirectoryBuilder
(
swh_storage
,
bytes
(
root
),
dir2
.
id
)
assert
not
root
.
exists
()
builder
.
build
()
assert
root
.
is_dir
()
assert
"This content is missing"
in
(
root
/
"content3"
)
.
open
()
.
read
()
def
test_directory_builder_missing_directory
(
swh_storage
,
tmp_path
):
dir2
=
_fill_storage
(
swh_storage
,
exclude_dir1
=
True
)
root
=
tmp_path
/
"root"
builder
=
DirectoryBuilder
(
swh_storage
,
bytes
(
root
),
dir2
.
id
)
assert
not
root
.
exists
()
builder
.
build
()
assert
root
.
is_dir
()
assert
set
(
root
.
glob
(
"**/*"
))
==
{
root
/
"subdirectory"
,
root
/
"content3"
,
}
assert
(
root
/
"content3"
)
.
open
()
.
read
()
==
"baz qux"
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 9:23 AM (5 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3239375
Attached To
rDVAU Software Heritage Vault
Event Timeline
Log In to Comment