Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/PKG-INFO b/PKG-INFO
index 2ab19e4..8f413ad 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.4.0
+Version: 0.4.1
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO
index 2ab19e4..8f413ad 100644
--- a/swh.indexer.egg-info/PKG-INFO
+++ b/swh.indexer.egg-info/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.4.0
+Version: 0.4.1
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh/indexer/storage/api/server.py b/swh/indexer/storage/api/server.py
index 09979ca..a6d6f32 100644
--- a/swh/indexer/storage/api/server.py
+++ b/swh/indexer/storage/api/server.py
@@ -1,118 +1,120 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import os
+from typing import Any, Dict, Optional
from swh.core import config
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.indexer.storage import INDEXER_CFG_KEY, get_indexer_storage
from swh.indexer.storage.exc import IndexerStorageArgumentException
from swh.indexer.storage.interface import IndexerStorageInterface
from .serializers import DECODERS, ENCODERS
def get_storage():
global storage
if not storage:
storage = get_indexer_storage(**app.config[INDEXER_CFG_KEY])
return storage
class IndexerStorageServerApp(RPCServerApp):
extra_type_decoders = DECODERS
extra_type_encoders = ENCODERS
app = IndexerStorageServerApp(
__name__, backend_class=IndexerStorageInterface, backend_factory=get_storage
)
storage = None
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.errorhandler(IndexerStorageArgumentException)
def argument_error_handler(exception):
return error_handler(exception, encode_data, status_code=400)
@app.route("/")
def index():
return "SWH Indexer Storage API server"
api_cfg = None
-def load_and_check_config(config_file, type="local"):
+def load_and_check_config(
+ config_path: Optional[str], type: str = "local"
+) -> Dict[str, Any]:
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
- config_file (str): Path to the configuration file to load
- type (str): configuration type. For 'local' type, more
- checks are done.
+ config_path: Path to the configuration file to load
+ type: configuration type. For 'local' type, more
+ checks are done.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
- if not config_file:
+ if not config_path:
raise EnvironmentError("Configuration file must be defined")
- if not os.path.exists(config_file):
- raise FileNotFoundError("Configuration file %s does not exist" % (config_file,))
+ if not os.path.exists(config_path):
+ raise FileNotFoundError(f"Configuration file {config_path} does not exist")
- cfg = config.read(config_file)
+ cfg = config.read(config_path)
if "indexer_storage" not in cfg:
raise KeyError("Missing '%indexer_storage' configuration")
if type == "local":
vcfg = cfg["indexer_storage"]
cls = vcfg.get("cls")
if cls != "local":
raise ValueError(
"The indexer_storage backend can only be started with a "
"'local' configuration"
)
- args = vcfg["args"]
- if not args.get("db"):
+ if not vcfg.get("db"):
raise ValueError("Invalid configuration; missing 'db' config entry")
return cfg
def make_app_from_configfile():
"""Run the WSGI app from the webserver, loading the configuration from
a configuration file.
SWH_CONFIG_FILENAME environment variable defines the
configuration path to load.
"""
global api_cfg
if not api_cfg:
- config_file = os.environ.get("SWH_CONFIG_FILENAME")
- api_cfg = load_and_check_config(config_file)
+ config_path = os.environ.get("SWH_CONFIG_FILENAME")
+ api_cfg = load_and_check_config(config_path)
app.config.update(api_cfg)
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app
if __name__ == "__main__":
print("Deprecated. Use swh-indexer")
diff --git a/swh/indexer/tests/storage/test_server.py b/swh/indexer/tests/storage/test_server.py
index 85d1f06..91cf037 100644
--- a/swh/indexer/tests/storage/test_server.py
+++ b/swh/indexer/tests/storage/test_server.py
@@ -1,96 +1,99 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import yaml
from swh.indexer.storage.api.server import load_and_check_config
def prepare_config_file(tmpdir, content, name="config.yml") -> str:
"""Prepare configuration file in `$tmpdir/name` with content `content`.
Args:
tmpdir (LocalPath): root directory
content (str/dict): Content of the file either as string or as a dict.
If a dict, converts the dict into a yaml string.
name (str): configuration filename
Returns
path (str) of the configuration file prepared.
"""
config_path = tmpdir / name
if isinstance(content, dict): # convert if needed
content = yaml.dump(content)
config_path.write_text(content, encoding="utf-8")
# pytest on python3.5 does not support LocalPath manipulation, so
# convert path to string
return str(config_path)
-def test_load_and_check_config_no_configuration() -> None:
- """Inexistent configuration files raises"""
- with pytest.raises(EnvironmentError) as e:
- load_and_check_config(None)
+@pytest.mark.parametrize("config_path", [None, ""])
+def test_load_and_check_config_no_configuration(config_path) -> None:
+ """Irrelevant configuration file path raises"""
+ with pytest.raises(EnvironmentError, match="Configuration file must be defined"):
+ load_and_check_config(config_path)
- assert e.value.args[0] == "Configuration file must be defined"
+def test_load_and_check_inexistent_config_path() -> None:
+ """Inexistent configuration file raises"""
config_path = "/indexer/inexistent/config.yml"
- with pytest.raises(FileNotFoundError) as e:
+ expected_error = f"Configuration file {config_path} does not exist"
+ with pytest.raises(FileNotFoundError, match=expected_error):
load_and_check_config(config_path)
- assert e.value.args[0] == "Configuration file %s does not exist" % (config_path,)
-
def test_load_and_check_config_wrong_configuration(tmpdir) -> None:
"""Wrong configuration raises"""
config_path = prepare_config_file(tmpdir, "something: useless")
- with pytest.raises(KeyError) as e:
+ with pytest.raises(KeyError, match="Missing '%indexer_storage' configuration"):
load_and_check_config(config_path)
- assert e.value.args[0] == "Missing '%indexer_storage' configuration"
-
-def test_load_and_check_config_remote_config_local_type_raise(tmpdir) -> None:
- """'local' configuration without 'local' storage raises"""
- config = {"indexer_storage": {"cls": "remote", "args": {}}}
- config_path = prepare_config_file(tmpdir, config)
- with pytest.raises(ValueError) as e:
- load_and_check_config(config_path, type="local")
+@pytest.mark.parametrize("class_storage", ["remote", "memory"])
+def test_load_and_check_config_remote_config_local_type_raise(
+ class_storage, tmpdir
+) -> None:
+ """Any other configuration than 'local' (the default) is rejected"""
+ assert class_storage != "local"
+ incompatible_config = {"indexer_storage": {"cls": class_storage}}
+ config_path = prepare_config_file(tmpdir, incompatible_config)
- assert (
- e.value.args[0]
- == "The indexer_storage backend can only be started with a 'local' "
+ expected_error = (
+ "The indexer_storage backend can only be started with a 'local' "
"configuration"
)
+ with pytest.raises(ValueError, match=expected_error):
+ load_and_check_config(config_path)
+ with pytest.raises(ValueError, match=expected_error):
+ load_and_check_config(config_path, type="local")
+
+
+def test_load_and_check_config_remote_config_fine(tmpdir) -> None:
+ """'Remote configuration is fine (when changing the default type)"""
+ config = {"indexer_storage": {"cls": "remote"}}
+ config_path = prepare_config_file(tmpdir, config)
+ cfg = load_and_check_config(config_path, type="any")
+
+ assert cfg == config
def test_load_and_check_config_local_incomplete_configuration(tmpdir) -> None:
"""Incomplete 'local' configuration should raise"""
- config = {"indexer_storage": {"cls": "local", "args": {}}}
+ config = {"indexer_storage": {"cls": "local"}}
+ expected_error = "Invalid configuration; missing 'db' config entry"
config_path = prepare_config_file(tmpdir, config)
- with pytest.raises(ValueError) as e:
+ with pytest.raises(ValueError, match=expected_error):
load_and_check_config(config_path)
- assert e.value.args[0] == "Invalid configuration; missing 'db' config entry"
-
def test_load_and_check_config_local_config_fine(tmpdir) -> None:
- """'Remote configuration is fine"""
- config = {"indexer_storage": {"cls": "local", "args": {"db": "db",}}}
+ """'Complete 'local' configuration is fine"""
+ config = {"indexer_storage": {"cls": "local", "db": "db",}}
config_path = prepare_config_file(tmpdir, config)
cfg = load_and_check_config(config_path, type="local")
assert cfg == config
-
-
-def test_load_and_check_config_remote_config_fine(tmpdir) -> None:
- """'Remote configuration is fine"""
- config = {"indexer_storage": {"cls": "remote", "args": {}}}
- config_path = prepare_config_file(tmpdir, config)
- cfg = load_and_check_config(config_path, type="any")
-
- assert cfg == config
diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py
index 6da0e7b..59c9528 100644
--- a/swh/indexer/tests/test_cli.py
+++ b/swh/indexer/tests/test_cli.py
@@ -1,386 +1,385 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import reduce
import re
import tempfile
from typing import Any, Dict, List
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Consumer, Producer
from swh.indexer.cli import indexer_cli_group
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.indexer.storage.model import (
OriginIntrinsicMetadataRow,
RevisionIntrinsicMetadataRow,
)
from swh.journal.serializers import value_to_kafka
from swh.model.hashutil import hash_to_bytes
CLI_CONFIG = """
scheduler:
cls: foo
args: {}
storage:
cls: memory
indexer_storage:
cls: memory
- args: {}
"""
def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]:
tools: List[Dict[str, Any]] = [
{"tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {},}
for i in range(2)
]
tools = idx_storage.indexer_configuration_add(tools)
origin_metadata = [
OriginIntrinsicMetadataRow(
id="file://dev/%04d" % origin_id,
from_revision=hash_to_bytes("abcd{:0>4}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
revision_metadata = [
RevisionIntrinsicMetadataRow(
id=hash_to_bytes("abcd{:0>4}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
idx_storage.revision_intrinsic_metadata_add(revision_metadata)
idx_storage.origin_intrinsic_metadata_add(origin_metadata)
return [tool["id"] for tool in tools]
def _origins_in_task_args(tasks):
"""Returns the set of origins contained in the arguments of the
provided tasks (assumed to be of type index-origin-metadata)."""
return reduce(
set.union, (set(task["arguments"]["args"][0]) for task in tasks), set()
)
def _assert_tasks_for_origins(tasks, origins):
expected_kwargs = {"policy_update": "update-dups"}
assert {task["type"] for task in tasks} == {"index-origin-metadata"}
assert all(len(task["arguments"]["args"]) == 1 for task in tasks)
for task in tasks:
assert task["arguments"]["kwargs"] == expected_kwargs, task
assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins])
def invoke(scheduler, catch_exceptions, args):
runner = CliRunner()
with patch(
"swh.scheduler.get_scheduler"
) as get_scheduler_mock, tempfile.NamedTemporaryFile(
"a", suffix=".yml"
) as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
result = runner.invoke(indexer_cli_group, ["-C" + config_fd.name] + args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_mapping_list(indexer_scheduler):
result = invoke(indexer_scheduler, False, ["mapping", "list",])
expected_output = "\n".join(
["codemeta", "gemspec", "maven", "npm", "pkg-info", "",]
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
def test_mapping_list_terms(indexer_scheduler):
result = invoke(indexer_scheduler, False, ["mapping", "list-terms",])
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
def test_mapping_list_terms_exclude(indexer_scheduler):
result = invoke(
indexer_scheduler,
False,
["mapping", "list-terms", "--exclude-mapping", "codemeta"],
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert not re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_empty_db(indexer_scheduler, idx_storage, storage):
result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",])
expected_output = "Nothing to do (no origin metadata matched the criteria).\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_divisor(indexer_scheduler, idx_storage, storage):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",])
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 9
_assert_tasks_for_origins(tasks, range(90))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_dry_run(indexer_scheduler, idx_storage, storage):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = invoke(
indexer_scheduler, False, ["schedule", "--dry-run", "reindex_origin_metadata",]
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_nondivisor(indexer_scheduler, idx_storage, storage):
"""Tests the re-indexing when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
fill_idx_storage(idx_storage, 70)
result = invoke(
indexer_scheduler,
False,
["schedule", "reindex_origin_metadata", "--batch-size", "20",],
)
# Check the output
expected_output = (
"Scheduled 3 tasks (60 origins).\n"
"Scheduled 4 tasks (70 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 4
_assert_tasks_for_origins(tasks, range(70))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_filter_one_mapping(
indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = invoke(
indexer_scheduler,
False,
["schedule", "reindex_origin_metadata", "--mapping", "mapping1",],
)
# Check the output
expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 2
_assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101])
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_filter_two_mappings(
indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = invoke(
indexer_scheduler,
False,
[
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
"--mapping",
"mapping2",
],
)
# Check the output
expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 3
_assert_tasks_for_origins(
tasks,
[
1,
11,
21,
31,
41,
51,
61,
71,
81,
91,
101,
2,
12,
22,
32,
42,
52,
62,
72,
82,
92,
102,
],
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_origin_metadata_reindex_filter_one_tool(
indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
tool_ids = fill_idx_storage(idx_storage, 110)
result = invoke(
indexer_scheduler,
False,
["schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]),],
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (55 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 6
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)])
def test_journal_client(
storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Test the 'swh indexer journal-client' cli tool."""
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}}
producer.produce(
topic=kafka_prefix + ".origin_visit",
key=b"bogus",
value=value_to_kafka(STATUS),
)
result = invoke(
indexer_scheduler,
False,
[
"journal-client",
"--stop-after-objects",
"1",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
],
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 1
_assert_tasks_for_origins(tasks, [0])

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 5:29 PM (3 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276596

Event Timeline