Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8392649
test_opam.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
test_opam.py
View Options
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
swh.loader.package.opam.loader
import
OpamLoader
,
OpamPackageInfo
from
swh.loader.tests
import
assert_last_visit_matches
,
check_snapshot
,
get_stats
from
swh.model.hashutil
import
hash_to_bytes
from
swh.model.model
import
Person
,
Snapshot
,
SnapshotBranch
,
TargetType
def
test_opam_loader_no_opam_repository_fails
(
swh_storage
,
tmpdir
,
datadir
):
"""Running opam loader without a prepared opam repository fails"""
opam_url
=
f
"file://{datadir}/fake_opam_repo"
opam_root
=
tmpdir
opam_instance
=
"loadertest"
opam_package
=
"agrid"
url
=
f
"opam+{opam_url}/packages/{opam_package}"
loader
=
OpamLoader
(
swh_storage
,
url
,
opam_root
,
opam_instance
,
opam_url
,
opam_package
,
initialize_opam_root
=
False
,
# The opam directory must be present
)
# No opam root directory init directory from loader. So, at the opam root does not
# exist, the loading fails. That's the expected use for the production workers
# (whose opam_root maintenance will be externally managed).
actual_load_status
=
loader
.
load
()
assert
actual_load_status
==
{
"status"
:
"failed"
}
def
test_opam_loader_one_version
(
tmpdir
,
requests_mock_datadir
,
datadir
,
swh_storage
):
opam_url
=
f
"file://{datadir}/fake_opam_repo"
opam_root
=
tmpdir
opam_instance
=
"loadertest"
opam_package
=
"agrid"
url
=
f
"opam+{opam_url}/packages/{opam_package}"
loader
=
OpamLoader
(
swh_storage
,
url
,
opam_root
,
opam_instance
,
opam_url
,
opam_package
,
initialize_opam_root
=
True
,
)
actual_load_status
=
loader
.
load
()
expected_snapshot_id
=
hash_to_bytes
(
"4e4bf977312460329d7f769b0be89937c9827efc"
)
assert
actual_load_status
==
{
"status"
:
"eventful"
,
"snapshot_id"
:
expected_snapshot_id
.
hex
(),
}
target
=
b
"S
\x8c\x8a
q
\xdc
y
\xa4
/0
\xa0\xb2
j
\xeb\xc1\x16\xad\xce\x06\xea
V"
expected_snapshot
=
Snapshot
(
id
=
expected_snapshot_id
,
branches
=
{
b
"HEAD"
:
SnapshotBranch
(
target
=
b
"agrid.0.1"
,
target_type
=
TargetType
.
ALIAS
,),
b
"agrid.0.1"
:
SnapshotBranch
(
target
=
target
,
target_type
=
TargetType
.
REVISION
,
),
},
)
check_snapshot
(
expected_snapshot
,
swh_storage
)
assert_last_visit_matches
(
swh_storage
,
url
,
status
=
"full"
,
type
=
"opam"
,
snapshot
=
expected_snapshot_id
)
stats
=
get_stats
(
swh_storage
)
assert
{
"content"
:
18
,
"directory"
:
8
,
"origin"
:
1
,
"origin_visit"
:
1
,
"release"
:
0
,
"revision"
:
1
,
"skipped_content"
:
0
,
"snapshot"
:
1
,
}
==
stats
def
test_opam_loader_many_version
(
tmpdir
,
requests_mock_datadir
,
datadir
,
swh_storage
):
opam_url
=
f
"file://{datadir}/fake_opam_repo"
opam_root
=
tmpdir
opam_instance
=
"loadertest"
opam_package
=
"directories"
url
=
f
"opam+{opam_url}/packages/{opam_package}"
loader
=
OpamLoader
(
swh_storage
,
url
,
opam_root
,
opam_instance
,
opam_url
,
opam_package
,
initialize_opam_root
=
True
,
)
actual_load_status
=
loader
.
load
()
expected_snapshot_id
=
hash_to_bytes
(
"1b49be175dcf17c0f568bcd7aac3d4faadc41249"
)
assert
actual_load_status
==
{
"status"
:
"eventful"
,
"snapshot_id"
:
expected_snapshot_id
.
hex
(),
}
expected_snapshot
=
Snapshot
(
id
=
expected_snapshot_id
,
branches
=
{
b
"HEAD"
:
SnapshotBranch
(
target
=
b
"directories.0.3"
,
target_type
=
TargetType
.
ALIAS
,
),
b
"directories.0.1"
:
SnapshotBranch
(
target
=
b
"N
\x92
jA
\xb2\x89
2
\xeb\xcc\x9c\xa9\xb3\xea\xa7
kz
\xb0
8
\xa6
V"
,
target_type
=
TargetType
.
REVISION
,
),
b
"directories.0.2"
:
SnapshotBranch
(
target
=
b
"yj
\xc9\x1a\x8f\xe0\xaa\xff
[
\x88\xff
z"
b
"
\x91
C
\xcc\x96\xb7\xd4\xf6
5"
,
target_type
=
TargetType
.
REVISION
,
),
b
"directories.0.3"
:
SnapshotBranch
(
target
=
b
"hA
\xc4\xb5\x18
A8
\xb8
C
\x12\xa3\xa5
T
\xb7
/v
\x85
X
\xcb
"
,
target_type
=
TargetType
.
REVISION
,
),
},
)
check_snapshot
(
expected_snapshot
,
swh_storage
)
assert_last_visit_matches
(
swh_storage
,
url
,
status
=
"full"
,
type
=
"opam"
,
snapshot
=
expected_snapshot_id
)
def
test_opam_revision
(
tmpdir
,
requests_mock_datadir
,
swh_storage
,
datadir
):
opam_url
=
f
"file://{datadir}/fake_opam_repo"
opam_root
=
tmpdir
opam_instance
=
"loadertest"
opam_package
=
"ocb"
url
=
f
"opam+{opam_url}/packages/{opam_package}"
loader
=
OpamLoader
(
swh_storage
,
url
,
opam_root
,
opam_instance
,
opam_url
,
opam_package
,
initialize_opam_root
=
True
,
)
actual_load_status
=
loader
.
load
()
expected_snapshot_id
=
hash_to_bytes
(
"398df115b9feb2f463efd21941d69b7d59cd9025"
)
assert
actual_load_status
==
{
"status"
:
"eventful"
,
"snapshot_id"
:
expected_snapshot_id
.
hex
(),
}
info_iter
=
loader
.
get_package_info
(
"0.1"
)
branch_name
,
package_info
=
next
(
info_iter
)
expected_branch_name
=
"ocb.0.1"
expected_package_info
=
OpamPackageInfo
(
url
=
"https://github.com/OCamlPro/ocb/archive/0.1.tar.gz"
,
filename
=
None
,
directory_extrinsic_metadata
=
[],
author
=
Person
(
fullname
=
b
"OCamlPro <contact@ocamlpro.com>"
,
name
=
None
,
email
=
None
),
committer
=
Person
(
fullname
=
b
"OCamlPro <contact@ocamlpro.com>"
,
name
=
None
,
email
=
None
),
version
=
"0.1"
,
)
assert
branch_name
==
expected_branch_name
assert
package_info
==
expected_package_info
revision_id
=
b
"o
\xad\x7f
=
\x07\xbb\xaa
h
\xdb
I(
\xb0
'
\x10
z
\xfc\xff\x06
x
\x1b
"
revision
=
swh_storage
.
revision_get
([
revision_id
])[
0
]
assert
revision
is
not
None
assert
revision
.
author
==
expected_package_info
.
author
assert
revision
.
committer
==
expected_package_info
.
committer
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 7:01 PM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3296092
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment