Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9347359
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
27 KB
Subscribers
None
View Options
diff --git a/PKG-INFO b/PKG-INFO
index 2ab19e4..8f413ad 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.4.0
+Version: 0.4.1
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO
index 2ab19e4..8f413ad 100644
--- a/swh.indexer.egg-info/PKG-INFO
+++ b/swh.indexer.egg-info/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.4.0
+Version: 0.4.1
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh/indexer/storage/api/server.py b/swh/indexer/storage/api/server.py
index 09979ca..a6d6f32 100644
--- a/swh/indexer/storage/api/server.py
+++ b/swh/indexer/storage/api/server.py
@@ -1,118 +1,120 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import os
+from typing import Any, Dict, Optional
from swh.core import config
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.indexer.storage import INDEXER_CFG_KEY, get_indexer_storage
from swh.indexer.storage.exc import IndexerStorageArgumentException
from swh.indexer.storage.interface import IndexerStorageInterface
from .serializers import DECODERS, ENCODERS
def get_storage():
global storage
if not storage:
storage = get_indexer_storage(**app.config[INDEXER_CFG_KEY])
return storage
class IndexerStorageServerApp(RPCServerApp):
extra_type_decoders = DECODERS
extra_type_encoders = ENCODERS
app = IndexerStorageServerApp(
__name__, backend_class=IndexerStorageInterface, backend_factory=get_storage
)
storage = None
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.errorhandler(IndexerStorageArgumentException)
def argument_error_handler(exception):
return error_handler(exception, encode_data, status_code=400)
@app.route("/")
def index():
return "SWH Indexer Storage API server"
api_cfg = None
-def load_and_check_config(config_file, type="local"):
+def load_and_check_config(
+ config_path: Optional[str], type: str = "local"
+) -> Dict[str, Any]:
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
- config_file (str): Path to the configuration file to load
- type (str): configuration type. For 'local' type, more
- checks are done.
+ config_path: Path to the configuration file to load
+ type: configuration type. For 'local' type, more
+ checks are done.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
- if not config_file:
+ if not config_path:
raise EnvironmentError("Configuration file must be defined")
- if not os.path.exists(config_file):
- raise FileNotFoundError("Configuration file %s does not exist" % (config_file,))
+ if not os.path.exists(config_path):
+ raise FileNotFoundError(f"Configuration file {config_path} does not exist")
- cfg = config.read(config_file)
+ cfg = config.read(config_path)
if "indexer_storage" not in cfg:
raise KeyError("Missing '%indexer_storage' configuration")
if type == "local":
vcfg = cfg["indexer_storage"]
cls = vcfg.get("cls")
if cls != "local":
raise ValueError(
"The indexer_storage backend can only be started with a "
"'local' configuration"
)
- args = vcfg["args"]
- if not args.get("db"):
+ if not vcfg.get("db"):
raise ValueError("Invalid configuration; missing 'db' config entry")
return cfg
def make_app_from_configfile():
"""Run the WSGI app from the webserver, loading the configuration from
a configuration file.
SWH_CONFIG_FILENAME environment variable defines the
configuration path to load.
"""
global api_cfg
if not api_cfg:
- config_file = os.environ.get("SWH_CONFIG_FILENAME")
- api_cfg = load_and_check_config(config_file)
+ config_path = os.environ.get("SWH_CONFIG_FILENAME")
+ api_cfg = load_and_check_config(config_path)
app.config.update(api_cfg)
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app
if __name__ == "__main__":
print("Deprecated. Use swh-indexer")
diff --git a/swh/indexer/tests/storage/test_server.py b/swh/indexer/tests/storage/test_server.py
index 85d1f06..91cf037 100644
--- a/swh/indexer/tests/storage/test_server.py
+++ b/swh/indexer/tests/storage/test_server.py
@@ -1,96 +1,99 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import yaml
from swh.indexer.storage.api.server import load_and_check_config
def prepare_config_file(tmpdir, content, name="config.yml") -> str:
"""Prepare configuration file in `$tmpdir/name` with content `content`.
Args:
tmpdir (LocalPath): root directory
content (str/dict): Content of the file either as string or as a dict.
If a dict, converts the dict into a yaml string.
name (str): configuration filename
Returns
path (str) of the configuration file prepared.
"""
config_path = tmpdir / name
if isinstance(content, dict): # convert if needed
content = yaml.dump(content)
config_path.write_text(content, encoding="utf-8")
# pytest on python3.5 does not support LocalPath manipulation, so
# convert path to string
return str(config_path)
-def test_load_and_check_config_no_configuration() -> None:
- """Inexistent configuration files raises"""
- with pytest.raises(EnvironmentError) as e:
- load_and_check_config(None)
+@pytest.mark.parametrize("config_path", [None, ""])
+def test_load_and_check_config_no_configuration(config_path) -> None:
+ """Irrelevant configuration file path raises"""
+ with pytest.raises(EnvironmentError, match="Configuration file must be defined"):
+ load_and_check_config(config_path)
- assert e.value.args[0] == "Configuration file must be defined"
+def test_load_and_check_inexistent_config_path() -> None:
+ """Inexistent configuration file raises"""
config_path = "/indexer/inexistent/config.yml"
- with pytest.raises(FileNotFoundError) as e:
+ expected_error = f"Configuration file {config_path} does not exist"
+ with pytest.raises(FileNotFoundError, match=expected_error):
load_and_check_config(config_path)
- assert e.value.args[0] == "Configuration file %s does not exist" % (config_path,)
-
def test_load_and_check_config_wrong_configuration(tmpdir) -> None:
"""Wrong configuration raises"""
config_path = prepare_config_file(tmpdir, "something: useless")
- with pytest.raises(KeyError) as e:
+ with pytest.raises(KeyError, match="Missing '%indexer_storage' configuration"):
load_and_check_config(config_path)
- assert e.value.args[0] == "Missing '%indexer_storage' configuration"
-
-def test_load_and_check_config_remote_config_local_type_raise(tmpdir) -> None:
- """'local' configuration without 'local' storage raises"""
- config = {"indexer_storage": {"cls": "remote", "args": {}}}
- config_path = prepare_config_file(tmpdir, config)
- with pytest.raises(ValueError) as e:
- load_and_check_config(config_path, type="local")
+@pytest.mark.parametrize("class_storage", ["remote", "memory"])
+def test_load_and_check_config_remote_config_local_type_raise(
+ class_storage, tmpdir
+) -> None:
+ """Any other configuration than 'local' (the default) is rejected"""
+ assert class_storage != "local"
+ incompatible_config = {"indexer_storage": {"cls": class_storage}}
+ config_path = prepare_config_file(tmpdir, incompatible_config)
- assert (
- e.value.args[0]
- == "The indexer_storage backend can only be started with a 'local' "
+ expected_error = (
+ "The indexer_storage backend can only be started with a 'local' "
"configuration"
)
+ with pytest.raises(ValueError, match=expected_error):
+ load_and_check_config(config_path)
+ with pytest.raises(ValueError, match=expected_error):
+ load_and_check_config(config_path, type="local")
+
+
+def test_load_and_check_config_remote_config_fine(tmpdir) -> None:
+ """'Remote configuration is fine (when changing the default type)"""
+ config = {"indexer_storage": {"cls": "remote"}}
+ config_path = prepare_config_file(tmpdir, config)
+ cfg = load_and_check_config(config_path, type="any")
+
+ assert cfg == config
def test_load_and_check_config_local_incomplete_configuration(tmpdir) -> None:
"""Incomplete 'local' configuration should raise"""
- config = {"indexer_storage": {"cls": "local", "args": {}}}
+ config = {"indexer_storage": {"cls": "local"}}
+ expected_error = "Invalid configuration; missing 'db' config entry"
config_path = prepare_config_file(tmpdir, config)
- with pytest.raises(ValueError) as e:
+ with pytest.raises(ValueError, match=expected_error):
load_and_check_config(config_path)
- assert e.value.args[0] == "Invalid configuration; missing 'db' config entry"
-
def test_load_and_check_config_local_config_fine(tmpdir) -> None:
- """'Remote configuration is fine"""
- config = {"indexer_storage": {"cls": "local", "args": {"db": "db",}}}
+ """'Complete 'local' configuration is fine"""
+ config = {"indexer_storage": {"cls": "local", "db": "db",}}
config_path = prepare_config_file(tmpdir, config)
cfg = load_and_check_config(config_path, type="local")
assert cfg == config
-
-
-def test_load_and_check_config_remote_config_fine(tmpdir) -> None:
- """'Remote configuration is fine"""
- config = {"indexer_storage": {"cls": "remote", "args": {}}}
- config_path = prepare_config_file(tmpdir, config)
- cfg = load_and_check_config(config_path, type="any")
-
- assert cfg == config
diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py
index 6da0e7b..59c9528 100644
--- a/swh/indexer/tests/test_cli.py
+++ b/swh/indexer/tests/test_cli.py
@@ -1,386 +1,385 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import reduce
import re
import tempfile
from typing import Any, Dict, List
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Consumer, Producer
from swh.indexer.cli import indexer_cli_group
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.indexer.storage.model import (
OriginIntrinsicMetadataRow,
RevisionIntrinsicMetadataRow,
)
from swh.journal.serializers import value_to_kafka
from swh.model.hashutil import hash_to_bytes
CLI_CONFIG = """
scheduler:
cls: foo
args: {}
storage:
cls: memory
indexer_storage:
cls: memory
- args: {}
"""
def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]:
tools: List[Dict[str, Any]] = [
{"tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {},}
for i in range(2)
]
tools = idx_storage.indexer_configuration_add(tools)
origin_metadata = [
OriginIntrinsicMetadataRow(
id="file://dev/%04d" % origin_id,
from_revision=hash_to_bytes("abcd{:0>4}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
revision_metadata = [
RevisionIntrinsicMetadataRow(
id=hash_to_bytes("abcd{:0>4}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
idx_storage.revision_intrinsic_metadata_add(revision_metadata)
idx_storage.origin_intrinsic_metadata_add(origin_metadata)
return [tool["id"] for tool in tools]
def _origins_in_task_args(tasks):
"""Returns the set of origins contained in the arguments of the
provided tasks (assumed to be of type index-origin-metadata)."""
return reduce(
set.union, (set(task["arguments"]["args"][0]) for task in tasks), set()
)
def _assert_tasks_for_origins(tasks, origins):
expected_kwargs = {"policy_update": "update-dups"}
assert {task["type"] for task in tasks} == {"index-origin-metadata"}
assert all(len(task["arguments"]["args"]) == 1 for task in tasks)
for task in tasks:
assert task["arguments"]["kwargs"] == expected_kwargs, task
assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins])
def invoke(scheduler, catch_exceptions, args):
runner = CliRunner()
with patch(
"swh.scheduler.get_scheduler"
) as get_scheduler_mock, tempfile.NamedTemporaryFile(
"a", suffix=".yml"
) as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
result = runner.invoke(indexer_cli_group, ["-C" + config_fd.name] + args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_mapping_list(indexer_scheduler):
result = invoke(indexer_scheduler, False, ["mapping", "list",])
expected_output = "\n".join(
["codemeta", "gemspec", "maven", "npm", "pkg-info", "",]
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
def test_mapping_list_terms(indexer_scheduler):
result = invoke(indexer_scheduler, False, ["mapping", "list-terms",])
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
def test_mapping_list_terms_exclude(indexer_scheduler):
result = invoke(
indexer_scheduler,
False,
["mapping", "list-terms", "--exclude-mapping", "codemeta"],
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert not re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_empty_db(indexer_scheduler, idx_storage, storage):
result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",])
expected_output = "Nothing to do (no origin metadata matched the criteria).\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_divisor(indexer_scheduler, idx_storage, storage):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",])
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 9
_assert_tasks_for_origins(tasks, range(90))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_dry_run(indexer_scheduler, idx_storage, storage):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = invoke(
indexer_scheduler, False, ["schedule", "--dry-run", "reindex_origin_metadata",]
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_nondivisor(indexer_scheduler, idx_storage, storage):
"""Tests the re-indexing when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
fill_idx_storage(idx_storage, 70)
result = invoke(
indexer_scheduler,
False,
["schedule", "reindex_origin_metadata", "--batch-size", "20",],
)
# Check the output
expected_output = (
"Scheduled 3 tasks (60 origins).\n"
"Scheduled 4 tasks (70 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 4
_assert_tasks_for_origins(tasks, range(70))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_filter_one_mapping(
indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = invoke(
indexer_scheduler,
False,
["schedule", "reindex_origin_metadata", "--mapping", "mapping1",],
)
# Check the output
expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 2
_assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101])
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_filter_two_mappings(
indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = invoke(
indexer_scheduler,
False,
[
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
"--mapping",
"mapping2",
],
)
# Check the output
expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 3
_assert_tasks_for_origins(
tasks,
[
1,
11,
21,
31,
41,
51,
61,
71,
81,
91,
101,
2,
12,
22,
32,
42,
52,
62,
72,
82,
92,
102,
],
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_filter_one_tool(
indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
tool_ids = fill_idx_storage(idx_storage, 110)
result = invoke(
indexer_scheduler,
False,
["schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]),],
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (55 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 6
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)])
def test_journal_client(
storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Test the 'swh indexer journal-client' cli tool."""
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}}
producer.produce(
topic=kafka_prefix + ".origin_visit",
key=b"bogus",
value=value_to_kafka(STATUS),
)
result = invoke(
indexer_scheduler,
False,
[
"journal-client",
"--stop-after-objects",
"1",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
],
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 1
_assert_tasks_for_origins(tasks, [0])
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 5:29 PM (3 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276596
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment