Page MenuHomeSoftware Heritage

test_cli.py
No OneTemporary

test_cli.py

# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from functools import reduce
import re
from typing import Any, Dict, List
from unittest.mock import patch
import attr
from click.testing import CliRunner
from confluent_kafka import Consumer
import pytest
from swh.indexer import fossology_license
from swh.indexer.cli import indexer_cli_group
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.indexer.storage.model import (
ContentLicenseRow,
ContentMimetypeRow,
DirectoryIntrinsicMetadataRow,
OriginExtrinsicMetadataRow,
OriginIntrinsicMetadataRow,
)
from swh.journal.writer import get_journal_writer
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Content, Origin, OriginVisitStatus
from .test_metadata import REMD
from .utils import (
DIRECTORY2,
RAW_CONTENT_IDS,
RAW_CONTENTS,
REVISION,
SHA1_TO_LICENSES,
mock_compute_license,
)
def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]:
tools: List[Dict[str, Any]] = [
{
"tool_name": "tool %d" % i,
"tool_version": "0.0.1",
"tool_configuration": {},
}
for i in range(2)
]
tools = idx_storage.indexer_configuration_add(tools)
origin_metadata = [
OriginIntrinsicMetadataRow(
id="file://dev/%04d" % origin_id,
from_directory=hash_to_bytes("abcd{:0>36}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
directory_metadata = [
DirectoryIntrinsicMetadataRow(
id=hash_to_bytes("abcd{:0>36}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
idx_storage.directory_intrinsic_metadata_add(directory_metadata)
idx_storage.origin_intrinsic_metadata_add(origin_metadata)
return [tool["id"] for tool in tools]
def _origins_in_task_args(tasks):
"""Returns the set of origins contained in the arguments of the
provided tasks (assumed to be of type index-origin-metadata)."""
return reduce(
set.union, (set(task["arguments"]["args"][0]) for task in tasks), set()
)
def _assert_tasks_for_origins(tasks, origins):
expected_kwargs = {}
assert {task["type"] for task in tasks} == {"index-origin-metadata"}
assert all(len(task["arguments"]["args"]) == 1 for task in tasks)
for task in tasks:
assert task["arguments"]["kwargs"] == expected_kwargs, task
assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins])
@pytest.fixture
def cli_runner():
return CliRunner()
def test_cli_mapping_list(cli_runner, swh_config):
result = cli_runner.invoke(
indexer_cli_group,
["-C", swh_config, "mapping", "list"],
catch_exceptions=False,
)
expected_output = "\n".join(
[
"cff",
"codemeta",
"composer",
"gemspec",
"github",
"maven",
"npm",
"pkg-info",
"pubspec",
"",
] # must be sorted for test to pass
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
def test_cli_mapping_list_terms(cli_runner, swh_config):
result = cli_runner.invoke(
indexer_cli_group,
["-C", swh_config, "mapping", "list-terms"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
def test_cli_mapping_list_terms_exclude(cli_runner, swh_config):
result = cli_runner.invoke(
indexer_cli_group,
["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert not re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_empty_db(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
],
catch_exceptions=False,
)
expected_output = "Nothing to do (no origin metadata matched the criteria).\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_divisor(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 9
_assert_tasks_for_origins(tasks, range(90))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_dry_run(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"--dry-run",
"reindex_origin_metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_nondivisor(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
fill_idx_storage(idx_storage, 70)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
"--batch-size",
"20",
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (60 origins).\n"
"Scheduled 4 tasks (70 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 4
_assert_tasks_for_origins(tasks, range(70))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_filter_one_mapping(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 2
_assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101])
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_filter_two_mappings(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = cli_runner.invoke(
indexer_cli_group,
[
"--config-file",
swh_config,
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
"--mapping",
"mapping2",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 3
_assert_tasks_for_origins(
tasks,
[
1,
11,
21,
31,
41,
51,
61,
71,
81,
91,
101,
2,
12,
22,
32,
42,
52,
62,
72,
82,
92,
102,
],
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_filter_one_tool(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
tool_ids = fill_idx_storage(idx_storage, 110)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
"--tool-id",
str(tool_ids[0]),
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (55 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 6
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)])
def now():
return datetime.datetime.now(tz=datetime.timezone.utc)
def test_cli_journal_client_schedule(
cli_runner,
swh_config,
indexer_scheduler,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
]
journal_writer.write_additions("origin_visit_status", visit_statuses)
visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(visit_statuses),
"--origin-metadata-task-type",
"index-origin-metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata")
# This can be split into multiple tasks but no more than the origin-visit-statuses
# written in the journal
assert len(tasks) <= len(visit_statuses_full)
actual_origins = []
for task in tasks:
actual_task = dict(task)
assert actual_task["type"] == "index-origin-metadata"
scheduled_origins = actual_task["arguments"]["args"][0]
actual_origins.extend(scheduled_origins)
assert set(actual_origins) == {vs.origin for vs in visit_statuses_full}
def test_cli_journal_client_without_brokers(
cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Without brokers configuration, the cli fails."""
with pytest.raises(ValueError, match="brokers"):
cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
],
catch_exceptions=False,
)
@pytest.mark.parametrize("indexer_name", ["origin_intrinsic_metadata", "*"])
def test_cli_journal_client_index__origin_intrinsic_metadata(
cli_runner,
swh_config,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
idx_storage,
storage,
mocker,
swh_indexer_config,
indexer_name: str,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
]
journal_writer.write_additions("origin_visit_status", visit_statuses)
visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
storage.revision_add([REVISION])
mocker.patch(
"swh.indexer.metadata.get_head_swhid",
return_value=REVISION.swhid(),
)
mocker.patch(
"swh.indexer.metadata.DirectoryMetadataIndexer.index",
return_value=[
DirectoryIntrinsicMetadataRow(
id=DIRECTORY2.id,
indexer_configuration_id=1,
mappings=["cff"],
metadata={"foo": "bar"},
)
],
)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
indexer_name,
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(visit_statuses),
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
results = idx_storage.origin_intrinsic_metadata_get(
[status.origin for status in visit_statuses]
)
expected_results = [
OriginIntrinsicMetadataRow(
id=status.origin,
from_directory=DIRECTORY2.id,
tool={"id": 1, **swh_indexer_config["tools"]},
mappings=["cff"],
metadata={"foo": "bar"},
)
for status in sorted(visit_statuses_full, key=lambda r: r.origin)
]
assert sorted(results, key=lambda r: r.id) == expected_results
@pytest.mark.parametrize("indexer_name", ["extrinsic_metadata", "*"])
def test_cli_journal_client_index__origin_extrinsic_metadata(
cli_runner,
swh_config,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
idx_storage,
storage,
mocker,
swh_indexer_config,
indexer_name: str,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
origin = Origin("http://example.org/repo.git")
storage.origin_add([origin])
raw_extrinsic_metadata = attr.evolve(REMD, target=origin.swhid())
raw_extrinsic_metadata = attr.evolve(
raw_extrinsic_metadata, id=raw_extrinsic_metadata.compute_hash()
)
journal_writer.write_additions("raw_extrinsic_metadata", [raw_extrinsic_metadata])
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
indexer_name,
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
1,
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
results = idx_storage.origin_extrinsic_metadata_get([origin.url])
expected_results = [
OriginExtrinsicMetadataRow(
id=origin.url,
from_remd_id=raw_extrinsic_metadata.id,
tool={"id": 1, **swh_indexer_config["tools"]},
mappings=["github"],
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "https://forgefed.org/ns#Repository",
"name": "test software",
},
)
]
assert sorted(results, key=lambda r: r.id) == expected_results
def test_cli_journal_client_index__content_mimetype(
cli_runner,
swh_config,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
idx_storage,
obj_storage,
storage,
mocker,
swh_indexer_config,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
contents = []
expected_results = []
content_ids = []
for content_id, (raw_content, mimetypes, encoding) in RAW_CONTENTS.items():
content = Content.from_data(raw_content)
assert content_id == content.sha1
contents.append(content)
content_ids.append(content_id)
# Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: 1:5.39-3)
# returns different results. This allows to deal with such a case when executing
# tests on different environments machines (e.g. ci tox, ci debian, dev machine,
# ...)
all_mimetypes = mimetypes if isinstance(mimetypes, tuple) else [mimetypes]
expected_results.extend(
[
ContentMimetypeRow(
id=content.sha1,
tool={"id": 1, **swh_indexer_config["tools"]},
mimetype=mimetype,
encoding=encoding,
)
for mimetype in all_mimetypes
]
)
assert len(contents) == len(RAW_CONTENTS)
journal_writer.write_additions("content", contents)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
"content_mimetype",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(contents),
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
results = idx_storage.content_mimetype_get(content_ids)
assert len(results) == len(contents)
for result in results:
assert result in expected_results
def test_cli_journal_client_index__fossology_license(
cli_runner,
swh_config,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
idx_storage,
obj_storage,
storage,
mocker,
swh_indexer_config,
):
"""Test the 'swh indexer journal-client' cli tool."""
# Patch
fossology_license.compute_license = mock_compute_license
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
tool = {"id": 1, **swh_indexer_config["tools"]}
id0, id1, id2 = RAW_CONTENT_IDS
contents = []
content_ids = []
expected_results = []
for content_id, (raw_content, _, _) in RAW_CONTENTS.items():
content = Content.from_data(raw_content)
assert content_id == content.sha1
contents.append(content)
content_ids.append(content_id)
expected_results.extend(
[
ContentLicenseRow(id=content_id, tool=tool, license=license)
for license in SHA1_TO_LICENSES[content_id]
]
)
assert len(contents) == len(RAW_CONTENTS)
journal_writer.write_additions("content", contents)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
"content_fossology_license",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(contents),
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
results = idx_storage.content_fossology_license_get(content_ids)
assert len(results) == len(expected_results)
for result in results:
assert result in expected_results

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 11:53 AM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3294934

Event Timeline