Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/PKG-INFO b/PKG-INFO
index 0dcc595..bddb916 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 1.1.0
+Version: 1.2.0
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO
index 0dcc595..bddb916 100644
--- a/swh.indexer.egg-info/PKG-INFO
+++ b/swh.indexer.egg-info/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 1.1.0
+Version: 1.2.0
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py
index 33d462c..e6581a6 100644
--- a/swh/indexer/cli.py
+++ b/swh/indexer/cli.py
@@ -1,348 +1,363 @@
# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Callable, Dict, Iterator, List, Optional
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.core.cli import swh as swh_cli_group
@swh_cli_group.group(
name="indexer", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup
)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(
exists=True,
dir_okay=False,
),
help="Configuration file.",
)
@click.pass_context
def indexer_cli_group(ctx, config_file):
"""Software Heritage Indexer tools.
The Indexer is used to mine the content of the archive and extract derived
information from archive source code artifacts.
"""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file)
ctx.obj["config"] = conf
def _get_api(getter, config, config_key, url):
if url:
config[config_key] = {"cls": "remote", "url": url}
elif config_key not in config:
raise click.ClickException("Missing configuration for {}".format(config_key))
return getter(**config[config_key])
@indexer_cli_group.group("mapping")
def mapping():
"""Manage Software Heritage Indexer mappings."""
pass
@mapping.command("list")
def mapping_list():
"""Prints the list of known mappings."""
from swh.indexer import metadata_dictionary
mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()]
mapping_names.sort()
for mapping_name in mapping_names:
click.echo(mapping_name)
@mapping.command("list-terms")
@click.option(
"--exclude-mapping", multiple=True, help="Exclude the given mapping from the output"
)
@click.option(
"--concise",
is_flag=True,
default=False,
help="Don't print the list of mappings supporting each term.",
)
def mapping_list_terms(concise, exclude_mapping):
"""Prints the list of known CodeMeta terms, and which mappings
support them."""
from swh.indexer import metadata_dictionary
properties = metadata_dictionary.list_terms()
for (property_name, supported_mappings) in sorted(properties.items()):
supported_mappings = {m.name for m in supported_mappings}
supported_mappings -= set(exclude_mapping)
if supported_mappings:
if concise:
click.echo(property_name)
else:
click.echo("{}:".format(property_name))
click.echo("\t" + ", ".join(sorted(supported_mappings)))
@mapping.command("translate")
@click.argument("mapping-name")
@click.argument("file", type=click.File("rb"))
def mapping_translate(mapping_name, file):
"""Translates file from mapping-name to codemeta format."""
import json
from swh.indexer import metadata_dictionary
mapping_cls = [
cls for cls in metadata_dictionary.MAPPINGS.values() if cls.name == mapping_name
]
if not mapping_cls:
raise click.ClickException("Unknown mapping {}".format(mapping_name))
assert len(mapping_cls) == 1
mapping_cls = mapping_cls[0]
mapping = mapping_cls()
codemeta_doc = mapping.translate(file.read())
click.echo(json.dumps(codemeta_doc, indent=4))
@indexer_cli_group.group("schedule")
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API")
@click.option(
"--indexer-storage-url", "-i", default=None, help="URL of the indexer storage API"
)
@click.option(
"--storage-url", "-g", default=None, help="URL of the (graph) storage API"
)
@click.option(
"--dry-run/--no-dry-run",
is_flag=True,
default=False,
help="List only what would be scheduled.",
)
@click.pass_context
def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run):
"""Manipulate Software Heritage Indexer tasks.
Via SWH Scheduler's API."""
from swh.indexer.storage import get_indexer_storage
from swh.scheduler import get_scheduler
from swh.storage import get_storage
ctx.obj["indexer_storage"] = _get_api(
get_indexer_storage, ctx.obj["config"], "indexer_storage", indexer_storage_url
)
ctx.obj["storage"] = _get_api(
get_storage, ctx.obj["config"], "storage", storage_url
)
ctx.obj["scheduler"] = _get_api(
get_scheduler, ctx.obj["config"], "scheduler", scheduler_url
)
if dry_run:
ctx.obj["scheduler"] = None
def list_origins_by_producer(idx_storage, mappings, tool_ids) -> Iterator[str]:
next_page_token = ""
limit = 10000
while next_page_token is not None:
result = idx_storage.origin_intrinsic_metadata_search_by_producer(
page_token=next_page_token,
limit=limit,
ids_only=True,
mappings=mappings or None,
tool_ids=tool_ids or None,
)
next_page_token = result.next_page_token
yield from result.results
@schedule.command("reindex_origin_metadata")
@click.option(
"--batch-size",
"-b",
"origin_batch_size",
default=10,
show_default=True,
type=int,
help="Number of origins per task",
)
@click.option(
"--tool-id",
"-t",
"tool_ids",
type=int,
multiple=True,
help="Restrict search of old metadata to this/these tool ids.",
)
@click.option(
"--mapping",
"-m",
"mappings",
multiple=True,
help="Mapping(s) that should be re-scheduled (eg. 'npm', 'gemspec', 'maven')",
)
@click.option(
"--task-type",
default="index-origin-metadata",
show_default=True,
help="Name of the task type to schedule.",
)
@click.pass_context
def schedule_origin_metadata_reindex(
ctx, origin_batch_size, tool_ids, mappings, task_type
):
"""Schedules indexing tasks for origins that were already indexed."""
from swh.scheduler.cli_utils import schedule_origin_batches
idx_storage = ctx.obj["indexer_storage"]
scheduler = ctx.obj["scheduler"]
origins = list_origins_by_producer(idx_storage, mappings, tool_ids)
kwargs = {"retries_left": 1}
schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs)
@indexer_cli_group.command("journal-client")
@click.argument(
"indexer",
- type=click.Choice(["origin-intrinsic-metadata"]),
+ type=click.Choice(["origin-intrinsic-metadata", "*"]),
required=False
# TODO: remove required=False after we stop using it
)
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API")
@click.option(
"--origin-metadata-task-type",
default="index-origin-metadata",
help="Name of the task running the origin metadata indexer.",
)
@click.option(
"--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to."
)
@click.option(
"--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from."
)
@click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.")
@click.option(
"--stop-after-objects",
"-m",
default=None,
type=int,
help="Maximum number of objects to replay. Default is to run forever.",
)
@click.pass_context
def journal_client(
ctx,
indexer: Optional[str],
scheduler_url: str,
origin_metadata_task_type: str,
brokers: List[str],
prefix: str,
group_id: str,
stop_after_objects: Optional[int],
):
- """Listens for new objects from the SWH Journal, and schedules tasks
- to run relevant indexers (currently, only origin-intrinsic-metadata)
- on these new objects."""
+ """
+ Listens for new objects from the SWH Journal, and either:
+
+ * runs the indexer with the name passed as argument, if any
+ * schedules tasks to run relevant indexers (currently, only
+ origin-intrinsic-metadata) on these new objects otherwise.
+
+ Passing '*' as indexer name runs all indexers.
+ """
import functools
import warnings
from swh.indexer.indexer import ObjectsDict
from swh.indexer.journal_client import process_journal_objects
from swh.journal.client import get_journal_client
from swh.scheduler import get_scheduler
cfg = ctx.obj["config"]
journal_cfg = cfg.get("journal", {})
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url)
brokers = brokers or journal_cfg.get("brokers")
if not brokers:
raise ValueError("The brokers configuration is mandatory.")
prefix = prefix or journal_cfg.get("prefix")
group_id = group_id or journal_cfg.get("group_id")
origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get(
"origin_metadata_task_type"
)
stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects")
- worker_fn: Callable[[ObjectsDict], Dict]
+ object_types = set()
+ worker_fns: List[Callable[[ObjectsDict], Dict]] = []
if indexer is None:
warnings.warn(
"'swh indexer journal-client' with no argument creates scheduler tasks "
"to index, rather than index directly.",
DeprecationWarning,
)
- object_types = ["origin_visit_status"]
- worker_fn = functools.partial(
- process_journal_objects,
- scheduler=scheduler,
- task_names={
- "origin_metadata": origin_metadata_task_type,
- },
+ object_types.add("origin_visit_status")
+ worker_fns.append(
+ functools.partial(
+ process_journal_objects,
+ scheduler=scheduler,
+ task_names={
+ "origin_metadata": origin_metadata_task_type,
+ },
+ )
)
- elif indexer == "origin-intrinsic-metadata":
+
+ if indexer in ("origin-intrinsic-metadata", "*"):
from swh.indexer.metadata import OriginMetadataIndexer
- object_types = ["origin_visit_status"]
+ object_types.add("origin_visit_status")
idx = OriginMetadataIndexer()
idx.catch_exceptions = False # don't commit offsets if indexation failed
- worker_fn = idx.process_journal_objects
- else:
+ worker_fns.append(idx.process_journal_objects)
+
+ if not worker_fns:
raise click.ClickException(f"Unknown indexer: {indexer}")
client = get_journal_client(
cls="kafka",
brokers=brokers,
prefix=prefix,
group_id=group_id,
- object_types=object_types,
+ object_types=list(object_types),
stop_after_objects=stop_after_objects,
)
+ def worker_fn(objects: ObjectsDict):
+ for fn in worker_fns:
+ fn(objects)
+
try:
client.process(worker_fn)
except KeyboardInterrupt:
ctx.exit(0)
else:
print("Done.")
finally:
client.close()
@indexer_cli_group.command("rpc-serve")
@click.argument("config-path", required=True)
@click.option("--host", default="0.0.0.0", help="Host to run the server")
@click.option("--port", default=5007, type=click.INT, help="Binding port of the server")
@click.option(
"--debug/--nodebug",
default=True,
help="Indicates if the server should run in debug mode",
)
def rpc_server(config_path, host, port, debug):
"""Starts a Software Heritage Indexer RPC HTTP server."""
from swh.indexer.storage.api.server import app, load_and_check_config
api_cfg = load_and_check_config(config_path, type="any")
app.config.update(api_cfg)
app.run(host, port=int(port), debug=bool(debug))
def main():
return indexer_cli_group(auto_envvar_prefix="SWH_INDEXER")
if __name__ == "__main__":
main()
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
index 3341a75..d4f3e9e 100644
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -1,659 +1,655 @@
# Copyright (C) 2016-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
from contextlib import contextmanager
import logging
import os
import shutil
import tempfile
from typing import (
Any,
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Set,
TypeVar,
Union,
)
import warnings
import sentry_sdk
from typing_extensions import TypedDict
from swh.core import utils
from swh.core.config import load_from_envvar, merge_configs
from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.model import hashutil
from swh.model.model import Origin, Revision, Sha1Git
from swh.objstorage.exc import ObjNotFoundError
from swh.objstorage.factory import get_objstorage
from swh.scheduler import CONFIG as SWH_CONFIG
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
class ObjectsDict(TypedDict, total=False):
revision: List[Dict]
origin: List[Dict]
origin_visit_status: List[Dict]
@contextmanager
def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]:
"""Write the sha1's content in a temporary file.
Args:
filename: one of sha1's many filenames
data: the sha1's content to write in temporary
file
working_directory: the directory into which the
file is written
Returns:
The path to the temporary file created. That file is
filled in with the raw content's data.
"""
os.makedirs(working_directory, exist_ok=True)
temp_dir = tempfile.mkdtemp(dir=working_directory)
content_path = os.path.join(temp_dir, filename)
with open(content_path, "wb") as f:
f.write(data)
yield content_path
shutil.rmtree(temp_dir)
DEFAULT_CONFIG = {
INDEXER_CFG_KEY: {"cls": "memory"},
"storage": {"cls": "memory"},
"objstorage": {"cls": "memory"},
}
TId = TypeVar("TId")
"""type of the ids of index()ed objects."""
TData = TypeVar("TData")
"""type of the objects passed to index()."""
TResult = TypeVar("TResult")
"""return type of index()"""
class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta):
"""Base class for indexers to inherit from.
The main entry point is the :func:`run` function which is in
charge of triggering the computations on the batch dict/ids
received.
Indexers can:
- filter out ids whose data has already been indexed.
- retrieve ids data from storage or objstorage
- index this data depending on the object and store the result in
storage.
To implement a new object type indexer, inherit from the
BaseIndexer and implement indexing:
:meth:`~BaseIndexer.run`:
object_ids are different depending on object. For example: sha1 for
content, sha1_git for revision, directory, release, and id for origin
To implement a new concrete indexer, inherit from the object level
classes: :class:`ContentIndexer`, :class:`RevisionIndexer`,
:class:`OriginIndexer`.
Then you need to implement the following functions:
:meth:`~BaseIndexer.filter`:
filter out data already indexed (in storage).
:meth:`~BaseIndexer.index_object`:
compute index on id with data (retrieved from the storage or the
objstorage by the id key) and return the resulting index computation.
:meth:`~BaseIndexer.persist_index_computations`:
persist the results of multiple index computations in the storage.
The new indexer implementation can also override the following functions:
:meth:`~BaseIndexer.prepare`:
Configuration preparation for the indexer. When overriding, this must
call the `super().prepare()` instruction.
:meth:`~BaseIndexer.check`:
Configuration check for the indexer. When overriding, this must call the
`super().check()` instruction.
:meth:`~BaseIndexer.register_tools`:
This should return a dict of the tool(s) to use when indexing or
filtering.
"""
results: List[TResult]
USE_TOOLS = True
catch_exceptions = True
"""Prevents exceptions in `index()` from raising too high. Set to False
in tests to properly catch all exceptions."""
scheduler: Any
storage: StorageInterface
objstorage: Any
idx_storage: IndexerStorageInterface
def __init__(self, config=None, **kw) -> None:
"""Prepare and check that the indexer is ready to run."""
super().__init__()
if config is not None:
self.config = config
elif SWH_CONFIG:
self.config = SWH_CONFIG.copy()
else:
self.config = load_from_envvar()
self.config = merge_configs(DEFAULT_CONFIG, self.config)
self.prepare()
self.check()
self.log.debug("%s: config=%s", self, self.config)
def prepare(self) -> None:
"""Prepare the indexer's needed runtime configuration.
Without this step, the indexer cannot possibly run.
"""
config_storage = self.config.get("storage")
if config_storage:
self.storage = get_storage(**config_storage)
self.objstorage = get_objstorage(**self.config["objstorage"])
idx_storage = self.config[INDEXER_CFG_KEY]
self.idx_storage = get_indexer_storage(**idx_storage)
_log = logging.getLogger("requests.packages.urllib3.connectionpool")
_log.setLevel(logging.WARN)
self.log = logging.getLogger("swh.indexer")
if self.USE_TOOLS:
self.tools = list(self.register_tools(self.config.get("tools", [])))
self.results = []
@property
def tool(self) -> Dict:
return self.tools[0]
def check(self) -> None:
"""Check the indexer's configuration is ok before proceeding.
If ok, does nothing. If not raise error.
"""
if self.USE_TOOLS and not self.tools:
raise ValueError("Tools %s is unknown, cannot continue" % self.tools)
def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare the tool dict to be compliant with the storage api."""
return {"tool_%s" % key: value for key, value in tool.items()}
def register_tools(
self, tools: Union[Dict[str, Any], List[Dict[str, Any]]]
) -> List[Dict[str, Any]]:
"""Permit to register tools to the storage.
Add a sensible default which can be overridden if not
sufficient. (For now, all indexers use only one tool)
Expects the self.config['tools'] property to be set with
one or more tools.
Args:
tools: Either a dict or a list of dict.
Returns:
list: List of dicts with additional id key.
Raises:
ValueError: if not a list nor a dict.
"""
if isinstance(tools, list):
tools = list(map(self._prepare_tool, tools))
elif isinstance(tools, dict):
tools = [self._prepare_tool(tools)]
else:
raise ValueError("Configuration tool(s) must be a dict or list!")
if tools:
return self.idx_storage.indexer_configuration_add(tools)
else:
return []
def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]:
"""Index computation for the id and associated raw data.
Args:
id: identifier or Dict object
data: id's data from storage or objstorage depending on
object type
Returns:
dict: a dict that makes sense for the
:meth:`.persist_index_computations` method.
"""
raise NotImplementedError()
def filter(self, ids: List[TId]) -> Iterator[TId]:
"""Filter missing ids for that particular indexer.
Args:
ids: list of ids
Yields:
iterator of missing ids
"""
yield from ids
@abc.abstractmethod
def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]:
"""Persist the computation resulting from the index.
Args:
results: List of results. One result is the
result of the index function.
Returns:
a summary dict of what has been inserted in the storage
"""
return {}
class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]):
"""A content indexer working on a list of ids directly.
To work on indexer partition, use the :class:`ContentPartitionIndexer`
instead.
Note: :class:`ContentIndexer` is not an instantiable object. To
use it, one should inherit from this class and override the
methods mentioned in the :class:`BaseIndexer` class.
"""
def run(self, ids: List[Sha1], **kwargs) -> Dict:
"""Given a list of ids:
- retrieve the content from the storage
- execute the indexing computations
- store the results
Args:
ids (Iterable[Union[bytes, str]]): sha1's identifier list
**kwargs: passed to the `index` method
Returns:
A summary Dict of the task's status
"""
if "policy_update" in kwargs:
warnings.warn(
"'policy_update' argument is deprecated and ignored.",
DeprecationWarning,
)
del kwargs["policy_update"]
sha1s = [
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
]
results = []
summary: Dict = {"status": "uneventful"}
try:
for sha1 in sha1s:
try:
raw_content = self.objstorage.get(sha1)
except ObjNotFoundError:
self.log.warning(
"Content %s not found in objstorage"
% hashutil.hash_to_hex(sha1)
)
continue
res = self.index(sha1, raw_content, **kwargs)
if res: # If no results, skip it
results.extend(res)
summary["status"] = "eventful"
summary = self.persist_index_computations(results)
self.results = results
except Exception:
if not self.catch_exceptions:
raise
self.log.exception("Problem when reading contents metadata.")
sentry_sdk.capture_exception()
summary["status"] = "failed"
return summary
class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]):
"""A content partition indexer.
This expects as input a partition_id and a nb_partitions. This will then index the
contents within that partition.
To work on a list of ids, use the :class:`ContentIndexer` instead.
Note: :class:`ContentPartitionIndexer` is not an instantiable
object. To use it, one should inherit from this class and override
the methods mentioned in the :class:`BaseIndexer` class.
"""
@abc.abstractmethod
def indexed_contents_in_partition(
self, partition_id: int, nb_partitions: int
) -> Iterable[Sha1]:
"""Retrieve indexed contents within range [start, end].
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
page_token: opaque token used for pagination
"""
pass
def _list_contents_to_index(
self, partition_id: int, nb_partitions: int, indexed: Set[Sha1]
) -> Iterable[Sha1]:
"""Compute from storage the new contents to index in the partition_id . The already
indexed contents are skipped.
Args:
partition_id: Index of the partition to fetch data from
nb_partitions: Total number of partition
indexed: Set of content already indexed.
Yields:
Sha1 id (bytes) of contents to index
"""
if not isinstance(partition_id, int) or not isinstance(nb_partitions, int):
raise TypeError(
f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}."
)
next_page_token = None
while True:
result = self.storage.content_get_partition(
partition_id, nb_partitions, page_token=next_page_token
)
contents = result.results
for c in contents:
_id = hashutil.hash_to_bytes(c.sha1)
if _id in indexed:
continue
yield _id
next_page_token = result.next_page_token
if next_page_token is None:
break
def _index_contents(
self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any
) -> Iterator[TResult]:
"""Index the contents within the partition_id.
Args:
start: Starting bound from range identifier
end: End range identifier
indexed: Set of content already indexed.
Yields:
indexing result as dict to persist in the indexer backend
"""
for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed):
try:
raw_content = self.objstorage.get(sha1)
except ObjNotFoundError:
self.log.warning(f"Content {sha1.hex()} not found in objstorage")
continue
yield from self.index(sha1, raw_content, **kwargs)
def _index_with_skipping_already_done(
self, partition_id: int, nb_partitions: int
) -> Iterator[TResult]:
"""Index not already indexed contents within the partition partition_id
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
Yields:
indexing result as dict to persist in the indexer backend
"""
already_indexed_contents = set(
self.indexed_contents_in_partition(partition_id, nb_partitions)
)
return self._index_contents(
partition_id, nb_partitions, already_indexed_contents
)
def run(
self,
partition_id: int,
nb_partitions: int,
skip_existing: bool = True,
**kwargs,
) -> Dict:
"""Given a partition of content ids, index the contents within.
Either the indexer is incremental (filter out existing computed data) or it
computes everything from scratch.
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
skip_existing: Skip existing indexed data
(default) or not
**kwargs: passed to the `index` method
Returns:
dict with the indexing task status
"""
summary: Dict[str, Any] = {"status": "uneventful"}
count = 0
try:
if skip_existing:
gen = self._index_with_skipping_already_done(
partition_id, nb_partitions
)
else:
gen = self._index_contents(partition_id, nb_partitions, indexed=set([]))
count_object_added_key: Optional[str] = None
for contents in utils.grouper(gen, n=self.config["write_batch_size"]):
res = self.persist_index_computations(list(contents))
if not count_object_added_key:
count_object_added_key = list(res.keys())[0]
count += res[count_object_added_key]
if count > 0:
summary["status"] = "eventful"
except Exception:
if not self.catch_exceptions:
raise
self.log.exception("Problem when computing metadata.")
sentry_sdk.capture_exception()
summary["status"] = "failed"
if count > 0 and count_object_added_key:
summary[count_object_added_key] = count
return summary
class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]):
"""An object type indexer, inherits from the :class:`BaseIndexer` and
implements Origin indexing using the run method
Note: the :class:`OriginIndexer` is not an instantiable object.
To use it in another context one should inherit from this class
and override the methods mentioned in the :class:`BaseIndexer`
class.
"""
def run(self, origin_urls: List[str], **kwargs) -> Dict:
"""Given a list of origin urls:
- retrieve origins from storage
- execute the indexing computations
- store the results
Args:
origin_urls: list of origin urls.
**kwargs: passed to the `index` method
"""
if "policy_update" in kwargs:
warnings.warn(
"'policy_update' argument is deprecated and ignored.",
DeprecationWarning,
)
del kwargs["policy_update"]
origins = [{"url": url} for url in origin_urls]
return self.process_journal_objects({"origin": origins})
def process_journal_objects(self, objects: ObjectsDict) -> Dict:
"""Worker function for ``JournalClient``. Expects ``objects`` to have a single
key, either ``origin`` or ``"origin_visit_status"``."""
- # TODO: add support for subscribing to other topics? Currently, this is
- # not implemented because no indexer would use it.
- assert set(objects) <= {"origin", "origin_visit_status"}
-
origins = [
Origin(url=status["origin"])
for status in objects.get("origin_visit_status", [])
if status["status"] == "full"
] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])]
summary: Dict[str, Any] = {"status": "uneventful"}
try:
results = self.index_list(
origins,
check_origin_known=False,
# no need to check they exist, as we just received either an origin or
# visit status; which cannot be created by swh-storage unless the origin
# already exists
)
except Exception:
if not self.catch_exceptions:
raise
summary["status"] = "failed"
return summary
summary_persist = self.persist_index_computations(results)
self.results = results
if summary_persist:
for value in summary_persist.values():
if value > 0:
summary["status"] = "eventful"
summary.update(summary_persist)
return summary
def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]:
results = []
for origin in origins:
try:
results.extend(self.index(origin.url, **kwargs))
except Exception:
self.log.exception("Problem when processing origin %s", origin.url)
sentry_sdk.capture_exception()
raise
return results
class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]):
"""An object type indexer, inherits from the :class:`BaseIndexer` and
implements Revision indexing using the run method
Note: the :class:`RevisionIndexer` is not an instantiable object.
To use it in another context one should inherit from this class
and override the methods mentioned in the :class:`BaseIndexer`
class.
"""
def run(self, ids: List[Sha1Git], **kwargs) -> Dict:
"""Given a list of sha1_gits:
- retrieve revisions from storage
- execute the indexing computations
- store the results
Args:
ids: sha1_git's identifier list
"""
if "policy_update" in kwargs:
warnings.warn(
"'policy_update' argument is deprecated and ignored.",
DeprecationWarning,
)
del kwargs["policy_update"]
revision_ids = [
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
]
revisions = []
for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)):
if not rev:
# TODO: call self.index() with rev=None?
self.log.warning(
"Revision %s not found in storage", hashutil.hash_to_hex(rev_id)
)
continue
revisions.append(rev.to_dict())
return self.process_journal_objects({"revision": revisions})
def process_journal_objects(self, objects: ObjectsDict) -> Dict:
"""Worker function for ``JournalClient``. Expects ``objects`` to have a single
key, ``"revision"``."""
assert set(objects) == {"revision"}
summary: Dict[str, Any] = {"status": "uneventful"}
results = []
for rev in objects["revision"]:
try:
results.extend(self.index(rev["id"], Revision.from_dict(rev)))
except Exception:
if not self.catch_exceptions:
raise
self.log.exception("Problem when processing revision")
sentry_sdk.capture_exception()
summary["status"] = "failed"
summary_persist = self.persist_index_computations(results)
if summary_persist:
for value in summary_persist.values():
if value > 0:
summary["status"] = "eventful"
summary.update(summary_persist)
self.results = results
return summary
diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py
index fd57816..763c896 100644
--- a/swh/indexer/tests/test_cli.py
+++ b/swh/indexer/tests/test_cli.py
@@ -1,653 +1,655 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from functools import reduce
import re
from typing import Any, Dict, List
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Consumer
import pytest
from swh.indexer.cli import indexer_cli_group
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.indexer.storage.model import (
OriginIntrinsicMetadataRow,
RevisionIntrinsicMetadataRow,
)
from swh.journal.writer import get_journal_writer
from swh.model.hashutil import hash_to_bytes
from swh.model.model import OriginVisitStatus
from .utils import REVISION
def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]:
tools: List[Dict[str, Any]] = [
{
"tool_name": "tool %d" % i,
"tool_version": "0.0.1",
"tool_configuration": {},
}
for i in range(2)
]
tools = idx_storage.indexer_configuration_add(tools)
origin_metadata = [
OriginIntrinsicMetadataRow(
id="file://dev/%04d" % origin_id,
from_revision=hash_to_bytes("abcd{:0>36}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
revision_metadata = [
RevisionIntrinsicMetadataRow(
id=hash_to_bytes("abcd{:0>36}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
idx_storage.revision_intrinsic_metadata_add(revision_metadata)
idx_storage.origin_intrinsic_metadata_add(origin_metadata)
return [tool["id"] for tool in tools]
def _origins_in_task_args(tasks):
"""Returns the set of origins contained in the arguments of the
provided tasks (assumed to be of type index-origin-metadata)."""
return reduce(
set.union, (set(task["arguments"]["args"][0]) for task in tasks), set()
)
def _assert_tasks_for_origins(tasks, origins):
expected_kwargs = {}
assert {task["type"] for task in tasks} == {"index-origin-metadata"}
assert all(len(task["arguments"]["args"]) == 1 for task in tasks)
for task in tasks:
assert task["arguments"]["kwargs"] == expected_kwargs, task
assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins])
@pytest.fixture
def cli_runner():
return CliRunner()
def test_cli_mapping_list(cli_runner, swh_config):
result = cli_runner.invoke(
indexer_cli_group,
["-C", swh_config, "mapping", "list"],
catch_exceptions=False,
)
expected_output = "\n".join(
[
"cff",
"codemeta",
"gemspec",
"maven",
"npm",
"pkg-info",
"",
] # must be sorted for test to pass
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
def test_cli_mapping_list_terms(cli_runner, swh_config):
result = cli_runner.invoke(
indexer_cli_group,
["-C", swh_config, "mapping", "list-terms"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
def test_cli_mapping_list_terms_exclude(cli_runner, swh_config):
result = cli_runner.invoke(
indexer_cli_group,
["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert not re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_empty_db(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
],
catch_exceptions=False,
)
expected_output = "Nothing to do (no origin metadata matched the criteria).\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_divisor(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 9
_assert_tasks_for_origins(tasks, range(90))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_dry_run(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"--dry-run",
"reindex_origin_metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_nondivisor(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
fill_idx_storage(idx_storage, 70)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
"--batch-size",
"20",
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (60 origins).\n"
"Scheduled 4 tasks (70 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 4
_assert_tasks_for_origins(tasks, range(70))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_filter_one_mapping(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 2
_assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101])
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_filter_two_mappings(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
result = cli_runner.invoke(
indexer_cli_group,
[
"--config-file",
swh_config,
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
"--mapping",
"mapping2",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 3
_assert_tasks_for_origins(
tasks,
[
1,
11,
21,
31,
41,
51,
61,
71,
81,
91,
101,
2,
12,
22,
32,
42,
52,
62,
72,
82,
92,
102,
],
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
def test_cli_origin_metadata_reindex_filter_one_tool(
cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
tool_ids = fill_idx_storage(idx_storage, 110)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"schedule",
"reindex_origin_metadata",
"--tool-id",
str(tool_ids[0]),
],
catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (55 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 6
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)])
def now():
return datetime.datetime.now(tz=datetime.timezone.utc)
def test_cli_journal_client_schedule(
cli_runner,
swh_config,
indexer_scheduler,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
]
journal_writer.write_additions("origin_visit_status", visit_statuses)
visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(visit_statuses),
"--origin-metadata-task-type",
"index-origin-metadata",
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata")
# This can be split into multiple tasks but no more than the origin-visit-statuses
# written in the journal
assert len(tasks) <= len(visit_statuses_full)
actual_origins = []
for task in tasks:
actual_task = dict(task)
assert actual_task["type"] == "index-origin-metadata"
scheduled_origins = actual_task["arguments"]["args"][0]
actual_origins.extend(scheduled_origins)
assert set(actual_origins) == {vs.origin for vs in visit_statuses_full}
def test_cli_journal_client_without_brokers(
cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Without brokers configuration, the cli fails."""
with pytest.raises(ValueError, match="brokers"):
cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
],
catch_exceptions=False,
)
+@pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"])
def test_cli_journal_client_index(
cli_runner,
swh_config,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
idx_storage,
storage,
mocker,
swh_indexer_config,
+ indexer_name: str,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
]
journal_writer.write_additions("origin_visit_status", visit_statuses)
visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
storage.revision_add([REVISION])
mocker.patch(
"swh.indexer.origin_head.OriginHeadIndexer.index",
return_value=[{"revision_id": REVISION.id}],
)
mocker.patch(
"swh.indexer.metadata.RevisionMetadataIndexer.index",
return_value=[
RevisionIntrinsicMetadataRow(
id=REVISION.id,
indexer_configuration_id=1,
mappings=["cff"],
metadata={"foo": "bar"},
)
],
)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
- "origin-intrinsic-metadata",
+ indexer_name,
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(visit_statuses),
],
catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
results = idx_storage.origin_intrinsic_metadata_get(
[status.origin for status in visit_statuses]
)
expected_results = [
OriginIntrinsicMetadataRow(
id=status.origin,
from_revision=REVISION.id,
tool={"id": 1, **swh_indexer_config["tools"]},
mappings=["cff"],
metadata={"foo": "bar"},
)
for status in sorted(visit_statuses_full, key=lambda r: r.origin)
]
assert sorted(results, key=lambda r: r.id) == expected_results

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 12:10 PM (1 d, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295909

Event Timeline