Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341267
test_cli.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
25 KB
Subscribers
None
test_cli.py
View Options
# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
datetime
from
functools
import
reduce
import
re
from
typing
import
Any
,
Dict
,
List
from
unittest.mock
import
patch
import
attr
from
click.testing
import
CliRunner
from
confluent_kafka
import
Consumer
import
pytest
from
swh.indexer
import
fossology_license
from
swh.indexer.cli
import
indexer_cli_group
from
swh.indexer.storage.interface
import
IndexerStorageInterface
from
swh.indexer.storage.model
import
(
ContentLicenseRow
,
ContentMimetypeRow
,
DirectoryIntrinsicMetadataRow
,
OriginExtrinsicMetadataRow
,
OriginIntrinsicMetadataRow
,
)
from
swh.journal.writer
import
get_journal_writer
from
swh.model.hashutil
import
hash_to_bytes
from
swh.model.model
import
Content
,
Origin
,
OriginVisitStatus
from
.test_metadata
import
REMD
from
.utils
import
(
DIRECTORY2
,
RAW_CONTENT_IDS
,
RAW_CONTENTS
,
REVISION
,
SHA1_TO_LICENSES
,
mock_compute_license
,
)
def
fill_idx_storage
(
idx_storage
:
IndexerStorageInterface
,
nb_rows
:
int
)
->
List
[
int
]:
tools
:
List
[
Dict
[
str
,
Any
]]
=
[
{
"tool_name"
:
"tool
%d
"
%
i
,
"tool_version"
:
"0.0.1"
,
"tool_configuration"
:
{},
}
for
i
in
range
(
2
)
]
tools
=
idx_storage
.
indexer_configuration_add
(
tools
)
origin_metadata
=
[
OriginIntrinsicMetadataRow
(
id
=
"file://dev/
%04d
"
%
origin_id
,
from_directory
=
hash_to_bytes
(
"abcd{:0>36}"
.
format
(
origin_id
)),
indexer_configuration_id
=
tools
[
origin_id
%
2
][
"id"
],
metadata
=
{
"name"
:
"origin
%d
"
%
origin_id
},
mappings
=
[
"mapping
%d
"
%
(
origin_id
%
10
)],
)
for
origin_id
in
range
(
nb_rows
)
]
directory_metadata
=
[
DirectoryIntrinsicMetadataRow
(
id
=
hash_to_bytes
(
"abcd{:0>36}"
.
format
(
origin_id
)),
indexer_configuration_id
=
tools
[
origin_id
%
2
][
"id"
],
metadata
=
{
"name"
:
"origin
%d
"
%
origin_id
},
mappings
=
[
"mapping
%d
"
%
(
origin_id
%
10
)],
)
for
origin_id
in
range
(
nb_rows
)
]
idx_storage
.
directory_intrinsic_metadata_add
(
directory_metadata
)
idx_storage
.
origin_intrinsic_metadata_add
(
origin_metadata
)
return
[
tool
[
"id"
]
for
tool
in
tools
]
def
_origins_in_task_args
(
tasks
):
"""Returns the set of origins contained in the arguments of the
provided tasks (assumed to be of type index-origin-metadata)."""
return
reduce
(
set
.
union
,
(
set
(
task
[
"arguments"
][
"args"
][
0
])
for
task
in
tasks
),
set
()
)
def
_assert_tasks_for_origins
(
tasks
,
origins
):
expected_kwargs
=
{}
assert
{
task
[
"type"
]
for
task
in
tasks
}
==
{
"index-origin-metadata"
}
assert
all
(
len
(
task
[
"arguments"
][
"args"
])
==
1
for
task
in
tasks
)
for
task
in
tasks
:
assert
task
[
"arguments"
][
"kwargs"
]
==
expected_kwargs
,
task
assert
_origins_in_task_args
(
tasks
)
==
set
([
"file://dev/
%04d
"
%
i
for
i
in
origins
])
@pytest.fixture
def
cli_runner
():
return
CliRunner
()
def
test_cli_mapping_list
(
cli_runner
,
swh_config
):
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"mapping"
,
"list"
],
catch_exceptions
=
False
,
)
expected_output
=
"
\n
"
.
join
(
[
"cff"
,
"codemeta"
,
"composer"
,
"gemspec"
,
"github"
,
"maven"
,
"npm"
,
"pkg-info"
,
"pubspec"
,
""
,
]
# must be sorted for test to pass
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
def
test_cli_mapping_list_terms
(
cli_runner
,
swh_config
):
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"mapping"
,
"list-terms"
],
catch_exceptions
=
False
,
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
search
(
r"http://schema.org/url:\n.*npm"
,
result
.
output
)
assert
re
.
search
(
r"http://schema.org/url:\n.*codemeta"
,
result
.
output
)
assert
re
.
search
(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta"
,
result
.
output
,
)
def
test_cli_mapping_list_terms_exclude
(
cli_runner
,
swh_config
):
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"mapping"
,
"list-terms"
,
"--exclude-mapping"
,
"codemeta"
],
catch_exceptions
=
False
,
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
search
(
r"http://schema.org/url:\n.*npm"
,
result
.
output
)
assert
not
re
.
search
(
r"http://schema.org/url:\n.*codemeta"
,
result
.
output
)
assert
not
re
.
search
(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta"
,
result
.
output
,
)
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_empty_db
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"schedule"
,
"reindex_origin_metadata"
,
],
catch_exceptions
=
False
,
)
expected_output
=
"Nothing to do (no origin metadata matched the criteria).
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
0
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_divisor
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage
(
idx_storage
,
90
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"schedule"
,
"reindex_origin_metadata"
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
(
"Scheduled 3 tasks (30 origins).
\n
"
"Scheduled 6 tasks (60 origins).
\n
"
"Scheduled 9 tasks (90 origins).
\n
"
"Done.
\n
"
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
9
_assert_tasks_for_origins
(
tasks
,
range
(
90
))
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_dry_run
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage
(
idx_storage
,
90
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"schedule"
,
"--dry-run"
,
"reindex_origin_metadata"
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
(
"Scheduled 3 tasks (30 origins).
\n
"
"Scheduled 6 tasks (60 origins).
\n
"
"Scheduled 9 tasks (90 origins).
\n
"
"Done.
\n
"
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
0
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_nondivisor
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
"""Tests the re-indexing when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
fill_idx_storage
(
idx_storage
,
70
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"schedule"
,
"reindex_origin_metadata"
,
"--batch-size"
,
"20"
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
(
"Scheduled 3 tasks (60 origins).
\n
"
"Scheduled 4 tasks (70 origins).
\n
"
"Done.
\n
"
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
4
_assert_tasks_for_origins
(
tasks
,
range
(
70
))
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_filter_one_mapping
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage
(
idx_storage
,
110
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"schedule"
,
"reindex_origin_metadata"
,
"--mapping"
,
"mapping1"
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Scheduled 2 tasks (11 origins).
\n
Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
2
_assert_tasks_for_origins
(
tasks
,
[
1
,
11
,
21
,
31
,
41
,
51
,
61
,
71
,
81
,
91
,
101
])
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_filter_two_mappings
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage
(
idx_storage
,
110
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"--config-file"
,
swh_config
,
"schedule"
,
"reindex_origin_metadata"
,
"--mapping"
,
"mapping1"
,
"--mapping"
,
"mapping2"
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Scheduled 3 tasks (22 origins).
\n
Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
3
_assert_tasks_for_origins
(
tasks
,
[
1
,
11
,
21
,
31
,
41
,
51
,
61
,
71
,
81
,
91
,
101
,
2
,
12
,
22
,
32
,
42
,
52
,
62
,
72
,
82
,
92
,
102
,
],
)
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
@patch
(
"swh.scheduler.cli_utils.TASK_BATCH_SIZE"
,
3
)
def
test_cli_origin_metadata_reindex_filter_one_tool
(
cli_runner
,
swh_config
,
indexer_scheduler
,
idx_storage
,
storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
tool_ids
=
fill_idx_storage
(
idx_storage
,
110
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"schedule"
,
"reindex_origin_metadata"
,
"--tool-id"
,
str
(
tool_ids
[
0
]),
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
(
"Scheduled 3 tasks (30 origins).
\n
"
"Scheduled 6 tasks (55 origins).
\n
"
"Done.
\n
"
)
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
6
_assert_tasks_for_origins
(
tasks
,
[
x
*
2
for
x
in
range
(
55
)])
def
now
():
return
datetime
.
datetime
.
now
(
tz
=
datetime
.
timezone
.
utc
)
def
test_cli_journal_client_schedule
(
cli_runner
,
swh_config
,
indexer_scheduler
,
kafka_prefix
:
str
,
kafka_server
,
consumer
:
Consumer
,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer
=
get_journal_writer
(
"kafka"
,
brokers
=
[
kafka_server
],
prefix
=
kafka_prefix
,
client_id
=
"test producer"
,
value_sanitizer
=
lambda
object_type
,
value
:
value
,
flush_timeout
=
3
,
# fail early if something is going wrong
)
visit_statuses
=
[
OriginVisitStatus
(
origin
=
"file:///dev/zero"
,
visit
=
1
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
origin
=
"file:///dev/foobar"
,
visit
=
2
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
origin
=
"file:///tmp/spamegg"
,
visit
=
3
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
origin
=
"file:///dev/0002"
,
visit
=
6
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
# will be filtered out due to its 'partial' status
origin
=
"file:///dev/0000"
,
visit
=
4
,
date
=
now
(),
status
=
"partial"
,
snapshot
=
None
,
),
OriginVisitStatus
(
# will be filtered out due to its 'ongoing' status
origin
=
"file:///dev/0001"
,
visit
=
5
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
None
,
),
]
journal_writer
.
write_additions
(
"origin_visit_status"
,
visit_statuses
)
visit_statuses_full
=
[
vs
for
vs
in
visit_statuses
if
vs
.
status
==
"full"
]
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"journal-client"
,
"--broker"
,
kafka_server
,
"--prefix"
,
kafka_prefix
,
"--group-id"
,
"test-consumer"
,
"--stop-after-objects"
,
len
(
visit_statuses
),
"--origin-metadata-task-type"
,
"index-origin-metadata"
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
# Check scheduled tasks
tasks
=
indexer_scheduler
.
search_tasks
(
task_type
=
"index-origin-metadata"
)
# This can be split into multiple tasks but no more than the origin-visit-statuses
# written in the journal
assert
len
(
tasks
)
<=
len
(
visit_statuses_full
)
actual_origins
=
[]
for
task
in
tasks
:
actual_task
=
dict
(
task
)
assert
actual_task
[
"type"
]
==
"index-origin-metadata"
scheduled_origins
=
actual_task
[
"arguments"
][
"args"
][
0
]
actual_origins
.
extend
(
scheduled_origins
)
assert
set
(
actual_origins
)
==
{
vs
.
origin
for
vs
in
visit_statuses_full
}
def
test_cli_journal_client_without_brokers
(
cli_runner
,
swh_config
,
kafka_prefix
:
str
,
kafka_server
,
consumer
:
Consumer
):
"""Without brokers configuration, the cli fails."""
with
pytest
.
raises
(
ValueError
,
match
=
"brokers"
):
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"journal-client"
,
],
catch_exceptions
=
False
,
)
@pytest.mark.parametrize
(
"indexer_name"
,
[
"origin_intrinsic_metadata"
,
"*"
])
def
test_cli_journal_client_index__origin_intrinsic_metadata
(
cli_runner
,
swh_config
,
kafka_prefix
:
str
,
kafka_server
,
consumer
:
Consumer
,
idx_storage
,
storage
,
mocker
,
swh_indexer_config
,
indexer_name
:
str
,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer
=
get_journal_writer
(
"kafka"
,
brokers
=
[
kafka_server
],
prefix
=
kafka_prefix
,
client_id
=
"test producer"
,
value_sanitizer
=
lambda
object_type
,
value
:
value
,
flush_timeout
=
3
,
# fail early if something is going wrong
)
visit_statuses
=
[
OriginVisitStatus
(
origin
=
"file:///dev/zero"
,
visit
=
1
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
origin
=
"file:///dev/foobar"
,
visit
=
2
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
origin
=
"file:///tmp/spamegg"
,
visit
=
3
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
origin
=
"file:///dev/0002"
,
visit
=
6
,
date
=
now
(),
status
=
"full"
,
snapshot
=
None
,
),
OriginVisitStatus
(
# will be filtered out due to its 'partial' status
origin
=
"file:///dev/0000"
,
visit
=
4
,
date
=
now
(),
status
=
"partial"
,
snapshot
=
None
,
),
OriginVisitStatus
(
# will be filtered out due to its 'ongoing' status
origin
=
"file:///dev/0001"
,
visit
=
5
,
date
=
now
(),
status
=
"ongoing"
,
snapshot
=
None
,
),
]
journal_writer
.
write_additions
(
"origin_visit_status"
,
visit_statuses
)
visit_statuses_full
=
[
vs
for
vs
in
visit_statuses
if
vs
.
status
==
"full"
]
storage
.
revision_add
([
REVISION
])
mocker
.
patch
(
"swh.indexer.metadata.get_head_swhid"
,
return_value
=
REVISION
.
swhid
(),
)
mocker
.
patch
(
"swh.indexer.metadata.DirectoryMetadataIndexer.index"
,
return_value
=
[
DirectoryIntrinsicMetadataRow
(
id
=
DIRECTORY2
.
id
,
indexer_configuration_id
=
1
,
mappings
=
[
"cff"
],
metadata
=
{
"foo"
:
"bar"
},
)
],
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"journal-client"
,
indexer_name
,
"--broker"
,
kafka_server
,
"--prefix"
,
kafka_prefix
,
"--group-id"
,
"test-consumer"
,
"--stop-after-objects"
,
len
(
visit_statuses
),
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
results
=
idx_storage
.
origin_intrinsic_metadata_get
(
[
status
.
origin
for
status
in
visit_statuses
]
)
expected_results
=
[
OriginIntrinsicMetadataRow
(
id
=
status
.
origin
,
from_directory
=
DIRECTORY2
.
id
,
tool
=
{
"id"
:
1
,
**
swh_indexer_config
[
"tools"
]},
mappings
=
[
"cff"
],
metadata
=
{
"foo"
:
"bar"
},
)
for
status
in
sorted
(
visit_statuses_full
,
key
=
lambda
r
:
r
.
origin
)
]
assert
sorted
(
results
,
key
=
lambda
r
:
r
.
id
)
==
expected_results
@pytest.mark.parametrize
(
"indexer_name"
,
[
"extrinsic_metadata"
,
"*"
])
def
test_cli_journal_client_index__origin_extrinsic_metadata
(
cli_runner
,
swh_config
,
kafka_prefix
:
str
,
kafka_server
,
consumer
:
Consumer
,
idx_storage
,
storage
,
mocker
,
swh_indexer_config
,
indexer_name
:
str
,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer
=
get_journal_writer
(
"kafka"
,
brokers
=
[
kafka_server
],
prefix
=
kafka_prefix
,
client_id
=
"test producer"
,
value_sanitizer
=
lambda
object_type
,
value
:
value
,
flush_timeout
=
3
,
# fail early if something is going wrong
)
origin
=
Origin
(
"http://example.org/repo.git"
)
storage
.
origin_add
([
origin
])
raw_extrinsic_metadata
=
attr
.
evolve
(
REMD
,
target
=
origin
.
swhid
())
raw_extrinsic_metadata
=
attr
.
evolve
(
raw_extrinsic_metadata
,
id
=
raw_extrinsic_metadata
.
compute_hash
()
)
journal_writer
.
write_additions
(
"raw_extrinsic_metadata"
,
[
raw_extrinsic_metadata
])
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"journal-client"
,
indexer_name
,
"--broker"
,
kafka_server
,
"--prefix"
,
kafka_prefix
,
"--group-id"
,
"test-consumer"
,
"--stop-after-objects"
,
1
,
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
results
=
idx_storage
.
origin_extrinsic_metadata_get
([
origin
.
url
])
expected_results
=
[
OriginExtrinsicMetadataRow
(
id
=
origin
.
url
,
from_remd_id
=
raw_extrinsic_metadata
.
id
,
tool
=
{
"id"
:
1
,
**
swh_indexer_config
[
"tools"
]},
mappings
=
[
"github"
],
metadata
=
{
"@context"
:
"https://doi.org/10.5063/schema/codemeta-2.0"
,
"type"
:
"https://forgefed.org/ns#Repository"
,
"name"
:
"test software"
,
},
)
]
assert
sorted
(
results
,
key
=
lambda
r
:
r
.
id
)
==
expected_results
def
test_cli_journal_client_index__content_mimetype
(
cli_runner
,
swh_config
,
kafka_prefix
:
str
,
kafka_server
,
consumer
:
Consumer
,
idx_storage
,
obj_storage
,
storage
,
mocker
,
swh_indexer_config
,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer
=
get_journal_writer
(
"kafka"
,
brokers
=
[
kafka_server
],
prefix
=
kafka_prefix
,
client_id
=
"test producer"
,
value_sanitizer
=
lambda
object_type
,
value
:
value
,
flush_timeout
=
3
,
# fail early if something is going wrong
)
contents
=
[]
expected_results
=
[]
content_ids
=
[]
for
content_id
,
(
raw_content
,
mimetypes
,
encoding
)
in
RAW_CONTENTS
.
items
():
content
=
Content
.
from_data
(
raw_content
)
assert
content_id
==
content
.
sha1
contents
.
append
(
content
)
content_ids
.
append
(
content_id
)
# Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: 1:5.39-3)
# returns different results. This allows to deal with such a case when executing
# tests on different environments machines (e.g. ci tox, ci debian, dev machine,
# ...)
all_mimetypes
=
mimetypes
if
isinstance
(
mimetypes
,
tuple
)
else
[
mimetypes
]
expected_results
.
extend
(
[
ContentMimetypeRow
(
id
=
content
.
sha1
,
tool
=
{
"id"
:
1
,
**
swh_indexer_config
[
"tools"
]},
mimetype
=
mimetype
,
encoding
=
encoding
,
)
for
mimetype
in
all_mimetypes
]
)
assert
len
(
contents
)
==
len
(
RAW_CONTENTS
)
journal_writer
.
write_additions
(
"content"
,
contents
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"journal-client"
,
"content_mimetype"
,
"--broker"
,
kafka_server
,
"--prefix"
,
kafka_prefix
,
"--group-id"
,
"test-consumer"
,
"--stop-after-objects"
,
len
(
contents
),
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
results
=
idx_storage
.
content_mimetype_get
(
content_ids
)
assert
len
(
results
)
==
len
(
contents
)
for
result
in
results
:
assert
result
in
expected_results
def
test_cli_journal_client_index__fossology_license
(
cli_runner
,
swh_config
,
kafka_prefix
:
str
,
kafka_server
,
consumer
:
Consumer
,
idx_storage
,
obj_storage
,
storage
,
mocker
,
swh_indexer_config
,
):
"""Test the 'swh indexer journal-client' cli tool."""
# Patch
fossology_license
.
compute_license
=
mock_compute_license
journal_writer
=
get_journal_writer
(
"kafka"
,
brokers
=
[
kafka_server
],
prefix
=
kafka_prefix
,
client_id
=
"test producer"
,
value_sanitizer
=
lambda
object_type
,
value
:
value
,
flush_timeout
=
3
,
# fail early if something is going wrong
)
tool
=
{
"id"
:
1
,
**
swh_indexer_config
[
"tools"
]}
id0
,
id1
,
id2
=
RAW_CONTENT_IDS
contents
=
[]
content_ids
=
[]
expected_results
=
[]
for
content_id
,
(
raw_content
,
_
,
_
)
in
RAW_CONTENTS
.
items
():
content
=
Content
.
from_data
(
raw_content
)
assert
content_id
==
content
.
sha1
contents
.
append
(
content
)
content_ids
.
append
(
content_id
)
expected_results
.
extend
(
[
ContentLicenseRow
(
id
=
content_id
,
tool
=
tool
,
license
=
license
)
for
license
in
SHA1_TO_LICENSES
[
content_id
]
]
)
assert
len
(
contents
)
==
len
(
RAW_CONTENTS
)
journal_writer
.
write_additions
(
"content"
,
contents
)
result
=
cli_runner
.
invoke
(
indexer_cli_group
,
[
"-C"
,
swh_config
,
"journal-client"
,
"content_fossology_license"
,
"--broker"
,
kafka_server
,
"--prefix"
,
kafka_prefix
,
"--group-id"
,
"test-consumer"
,
"--stop-after-objects"
,
len
(
contents
),
],
catch_exceptions
=
False
,
)
# Check the output
expected_output
=
"Done.
\n
"
assert
result
.
exit_code
==
0
,
result
.
output
assert
result
.
output
==
expected_output
results
=
idx_storage
.
content_fossology_license_get
(
content_ids
)
assert
len
(
results
)
==
len
(
expected_results
)
for
result
in
results
:
assert
result
in
expected_results
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 11:53 AM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3294934
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment