Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8322802
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
117 KB
Subscribers
None
View Options
diff --git a/PKG-INFO b/PKG-INFO
index 87a7e3d..ff967a3 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.6.3
+Version: 0.6.4
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/conftest.py b/conftest.py
index 1e93ce8..8392690 100644
--- a/conftest.py
+++ b/conftest.py
@@ -1,27 +1,31 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis import settings
import pytest
# define tests profile. Full documentation is at:
# https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles
settings.register_profile("fast", max_examples=5, deadline=5000)
settings.register_profile("slow", max_examples=20, deadline=5000)
# Ignore the following modules because wsgi module fails as no
# configuration file is found (--doctest-modules forces the module
# loading)
collect_ignore = ["swh/indexer/storage/api/wsgi.py"]
-# we use the swh_scheduler fixture
-pytest_plugins = ["swh.scheduler.pytest_plugin"]
+# we use the various swh fixtures
+pytest_plugins = [
+ "swh.scheduler.pytest_plugin",
+ "swh.storage.pytest_plugin",
+ "swh.core.db.pytest_plugin",
+]
@pytest.fixture(scope="session")
def swh_scheduler_celery_includes(swh_scheduler_celery_includes):
return swh_scheduler_celery_includes + [
"swh.indexer.tasks",
]
diff --git a/docs/cli.rst b/docs/cli.rst
new file mode 100644
index 0000000..2ad5a52
--- /dev/null
+++ b/docs/cli.rst
@@ -0,0 +1,8 @@
+.. _swh-indexer-cli:
+
+Command-line interface
+======================
+
+.. click:: swh.indexer.cli:indexer_cli_group
+ :prog: swh indexer
+ :nested: full
diff --git a/docs/index.rst b/docs/index.rst
index b80d6f4..ed79b46 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -1,25 +1,26 @@
.. _swh-indexer:
Software Heritage - Indexer
===========================
Tools and workers used to mine the content of the archive and extract derived
information from archive source code artifacts.
.. toctree::
:maxdepth: 1
:caption: Contents:
README.md
dev-info.rst
metadata-workflow.rst
Reference Documentation
-----------------------
.. toctree::
:maxdepth: 2
+ cli
/apidoc/swh.indexer
diff --git a/requirements-swh.txt b/requirements-swh.txt
index 40b5ff6..854fbce 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,6 +1,6 @@
-swh.core[db,http] >= 0.3
+swh.core[db,http] >= 0.9.1
swh.model >= 0.0.15
swh.objstorage >= 0.2.2
swh.scheduler >= 0.5.2
swh.storage >= 0.12.0
swh.journal >= 0.1.0
diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO
index 87a7e3d..ff967a3 100644
--- a/swh.indexer.egg-info/PKG-INFO
+++ b/swh.indexer.egg-info/PKG-INFO
@@ -1,71 +1,71 @@
Metadata-Version: 2.1
Name: swh.indexer
-Version: 0.6.3
+Version: 0.6.4
Summary: Software Heritage Content Indexer
Home-page: https://forge.softwareheritage.org/diffusion/78/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/
Description: swh-indexer
============
Tools to compute multiple indexes on SWH's raw contents:
- content:
- mimetype
- ctags
- language
- fossology-license
- metadata
- revision:
- metadata
An indexer is in charge of:
- looking up objects
- extracting information from those objects
- store those information in the swh-indexer db
There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
- origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
Current content indexers:
- mimetype (queue swh_indexer_content_mimetype): detect the encoding
and mimetype
- language (queue swh_indexer_content_language): detect the
programming language
- ctags (queue swh_indexer_content_ctags): compute tags information
- fossology-license (queue swh_indexer_fossology_license): compute the
license
- metadata: translate file into translated_metadata dict
Current revision indexers:
- metadata: detects files containing metadata and retrieves translated_metadata
in content_metadata table in storage or run content indexer to translate
files.
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh.indexer.egg-info/SOURCES.txt b/swh.indexer.egg-info/SOURCES.txt
index 88bcdbe..9f4e312 100644
--- a/swh.indexer.egg-info/SOURCES.txt
+++ b/swh.indexer.egg-info/SOURCES.txt
@@ -1,141 +1,142 @@
.gitignore
.pre-commit-config.yaml
AUTHORS
CODE_OF_CONDUCT.md
CONTRIBUTORS
LICENSE
MANIFEST.in
Makefile
Makefile.local
README.md
codemeta.json
conftest.py
mypy.ini
pyproject.toml
pytest.ini
requirements-swh.txt
requirements-test.txt
requirements.txt
setup.cfg
setup.py
tox.ini
docs/.gitignore
docs/Makefile
docs/Makefile.local
docs/README.md
+docs/cli.rst
docs/conf.py
docs/dev-info.rst
docs/index.rst
docs/metadata-workflow.rst
docs/_static/.placeholder
docs/_templates/.placeholder
docs/images/.gitignore
docs/images/Makefile
docs/images/tasks-metadata-indexers.uml
sql/bin/db-upgrade
sql/bin/dot_add_content
sql/doc/json
sql/doc/json/.gitignore
sql/doc/json/Makefile
sql/doc/json/indexer_configuration.tool_configuration.schema.json
sql/doc/json/revision_metadata.translated_metadata.json
sql/json/.gitignore
sql/json/Makefile
sql/json/indexer_configuration.tool_configuration.schema.json
sql/json/revision_metadata.translated_metadata.json
sql/upgrades/115.sql
sql/upgrades/116.sql
sql/upgrades/117.sql
sql/upgrades/118.sql
sql/upgrades/119.sql
sql/upgrades/120.sql
sql/upgrades/121.sql
sql/upgrades/122.sql
sql/upgrades/123.sql
sql/upgrades/124.sql
sql/upgrades/125.sql
sql/upgrades/126.sql
sql/upgrades/127.sql
sql/upgrades/128.sql
sql/upgrades/129.sql
sql/upgrades/130.sql
sql/upgrades/131.sql
sql/upgrades/132.sql
sql/upgrades/133.sql
swh/__init__.py
swh.indexer.egg-info/PKG-INFO
swh.indexer.egg-info/SOURCES.txt
swh.indexer.egg-info/dependency_links.txt
swh.indexer.egg-info/entry_points.txt
swh.indexer.egg-info/requires.txt
swh.indexer.egg-info/top_level.txt
swh/indexer/__init__.py
swh/indexer/cli.py
swh/indexer/codemeta.py
swh/indexer/ctags.py
swh/indexer/fossology_license.py
swh/indexer/indexer.py
swh/indexer/journal_client.py
swh/indexer/metadata.py
swh/indexer/metadata_detector.py
swh/indexer/mimetype.py
swh/indexer/origin_head.py
swh/indexer/py.typed
swh/indexer/rehash.py
swh/indexer/tasks.py
swh/indexer/data/codemeta/CITATION
swh/indexer/data/codemeta/LICENSE
swh/indexer/data/codemeta/codemeta.jsonld
swh/indexer/data/codemeta/crosswalk.csv
swh/indexer/metadata_dictionary/__init__.py
swh/indexer/metadata_dictionary/base.py
swh/indexer/metadata_dictionary/codemeta.py
swh/indexer/metadata_dictionary/maven.py
swh/indexer/metadata_dictionary/npm.py
swh/indexer/metadata_dictionary/python.py
swh/indexer/metadata_dictionary/ruby.py
swh/indexer/sql/10-superuser-init.sql
swh/indexer/sql/20-enums.sql
swh/indexer/sql/30-schema.sql
swh/indexer/sql/50-data.sql
swh/indexer/sql/50-func.sql
swh/indexer/sql/60-indexes.sql
swh/indexer/storage/__init__.py
swh/indexer/storage/converters.py
swh/indexer/storage/db.py
swh/indexer/storage/exc.py
swh/indexer/storage/in_memory.py
swh/indexer/storage/interface.py
swh/indexer/storage/metrics.py
swh/indexer/storage/model.py
swh/indexer/storage/writer.py
swh/indexer/storage/api/__init__.py
swh/indexer/storage/api/client.py
swh/indexer/storage/api/serializers.py
swh/indexer/storage/api/server.py
swh/indexer/tests/__init__.py
swh/indexer/tests/conftest.py
swh/indexer/tests/tasks.py
swh/indexer/tests/test_cli.py
swh/indexer/tests/test_codemeta.py
swh/indexer/tests/test_ctags.py
swh/indexer/tests/test_fossology_license.py
swh/indexer/tests/test_indexer.py
swh/indexer/tests/test_journal_client.py
swh/indexer/tests/test_metadata.py
swh/indexer/tests/test_mimetype.py
swh/indexer/tests/test_origin_head.py
swh/indexer/tests/test_origin_metadata.py
swh/indexer/tests/test_tasks.py
swh/indexer/tests/utils.py
swh/indexer/tests/storage/__init__.py
swh/indexer/tests/storage/conftest.py
swh/indexer/tests/storage/generate_data_test.py
swh/indexer/tests/storage/test_api_client.py
swh/indexer/tests/storage/test_converters.py
swh/indexer/tests/storage/test_in_memory.py
swh/indexer/tests/storage/test_init.py
swh/indexer/tests/storage/test_metrics.py
swh/indexer/tests/storage/test_model.py
swh/indexer/tests/storage/test_server.py
swh/indexer/tests/storage/test_storage.py
\ No newline at end of file
diff --git a/swh.indexer.egg-info/requires.txt b/swh.indexer.egg-info/requires.txt
index 25feb9c..c834180 100644
--- a/swh.indexer.egg-info/requires.txt
+++ b/swh.indexer.egg-info/requires.txt
@@ -1,19 +1,19 @@
click
python-magic>=0.4.13
pyld
xmltodict
typing-extensions
-swh.core[db,http]>=0.3
+swh.core[db,http]>=0.9.1
swh.model>=0.0.15
swh.objstorage>=0.2.2
swh.scheduler>=0.5.2
swh.storage>=0.12.0
swh.journal>=0.1.0
[testing]
confluent-kafka
pytest
pytest-mock
hypothesis>=3.11.0
swh.scheduler[testing]>=0.5.0
swh.storage[testing]>=0.10.0
diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py
index e423ccf..ab8b8b3 100644
--- a/swh/indexer/fossology_license.py
+++ b/swh/indexer/fossology_license.py
@@ -1,185 +1,184 @@
-# Copyright (C) 2016-2020 The Software Heritage developers
+# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import subprocess
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
+from swh.core.api.classes import stream_results
from swh.core.config import merge_configs
-from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult, Sha1
+from swh.indexer.storage.interface import IndexerStorageInterface, Sha1
from swh.indexer.storage.model import ContentLicenseRow
from swh.model import hashutil
from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp
logger = logging.getLogger(__name__)
def compute_license(path) -> Dict:
"""Determine license from file at path.
Args:
path: filepath to determine the license
Returns:
dict: A dict with the following keys:
- licenses ([str]): associated detected licenses to path
- path (bytes): content filepath
"""
try:
properties = subprocess.check_output(["nomossa", path], universal_newlines=True)
if properties:
res = properties.rstrip().split(" contains license(s) ")
licenses = res[1].split(",")
else:
licenses = []
return {
"licenses": licenses,
"path": path,
}
except subprocess.CalledProcessError:
from os import path as __path
logger.exception(
"Problem during license detection for sha1 %s" % __path.basename(path)
)
return {
"licenses": [],
"path": path,
}
DEFAULT_CONFIG: Dict[str, Any] = {
"workdir": "/tmp/swh/indexer.fossology.license",
"tools": {
"name": "nomos",
"version": "3.1.0rc2-31-ga2cbb8c",
"configuration": {"command_line": "nomossa <filepath>",},
},
"write_batch_size": 1000,
}
class MixinFossologyLicenseIndexer:
"""Mixin fossology license indexer.
See :class:`FossologyLicenseIndexer` and
:class:`FossologyLicensePartitionIndexer`
"""
tool: Any
idx_storage: IndexerStorageInterface
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = merge_configs(DEFAULT_CONFIG, self.config)
self.working_directory = self.config["workdir"]
def index(
self, id: Sha1, data: Optional[bytes] = None, **kwargs
) -> List[ContentLicenseRow]:
"""Index sha1s' content and store result.
Args:
id (bytes): content's identifier
raw_content (bytes): associated raw content to content id
Returns:
dict: A dict, representing a content_license, with keys:
- id (bytes): content's identifier (sha1)
- license (bytes): license in bytes
- path (bytes): path
- indexer_configuration_id (int): tool used to compute the output
"""
assert data is not None
with write_to_temp(
filename=hashutil.hash_to_hex(id), # use the id as pathname
data=data,
working_directory=self.working_directory,
) as content_path:
properties = compute_license(path=content_path)
return [
ContentLicenseRow(
id=id, indexer_configuration_id=self.tool["id"], license=license,
)
for license in properties["licenses"]
]
def persist_index_computations(
self, results: List[ContentLicenseRow]
) -> Dict[str, int]:
"""Persist the results in storage.
Args:
results: list of content_license dict with the
following keys:
- id (bytes): content's identifier (sha1)
- license (bytes): license in bytes
- path (bytes): path
"""
return self.idx_storage.content_fossology_license_add(results)
class FossologyLicenseIndexer(
MixinFossologyLicenseIndexer, ContentIndexer[ContentLicenseRow]
):
"""Indexer in charge of:
- filtering out content already indexed
- reading content from objstorage per the content's id (sha1)
- computing {license, encoding} from that content
- store result in storage
"""
def filter(self, ids):
"""Filter out known sha1s and return only missing ones.
"""
yield from self.idx_storage.content_fossology_license_missing(
({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids)
)
class FossologyLicensePartitionIndexer(
MixinFossologyLicenseIndexer, ContentPartitionIndexer[ContentLicenseRow]
):
"""FossologyLicense Range Indexer working on range/partition of content identifiers.
- filters out the non textual content
- (optionally) filters out content already indexed (cf
:meth:`.indexed_contents_in_partition`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
def indexed_contents_in_partition(
self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
- ) -> PagedResult[Sha1]:
+ ) -> Iterable[Sha1]:
"""Retrieve indexed content id within the partition id
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
page_token: opaque token used for pagination
-
- Returns:
- PagedResult of Sha1. If next_page_token is None, there is no more data
- to fetch
-
"""
- return self.idx_storage.content_fossology_license_get_partition(
- self.tool["id"], partition_id, nb_partitions, page_token=page_token
+ return stream_results(
+ self.idx_storage.content_fossology_license_get_partition,
+ self.tool["id"],
+ partition_id,
+ nb_partitions,
)
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
index 984ffbf..85c0d72 100644
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -1,613 +1,615 @@
-# Copyright (C) 2016-2020 The Software Heritage developers
+# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
from contextlib import contextmanager
import logging
import os
import shutil
import tempfile
-from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union
+from typing import (
+ Any,
+ Dict,
+ Generic,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ TypeVar,
+ Union,
+)
import warnings
from swh.core import utils
from swh.core.config import load_from_envvar, merge_configs
-from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage
+from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.model import hashutil
from swh.model.model import Revision, Sha1Git
from swh.objstorage.exc import ObjNotFoundError
from swh.objstorage.factory import get_objstorage
from swh.scheduler import CONFIG as SWH_CONFIG
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
@contextmanager
def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]:
"""Write the sha1's content in a temporary file.
Args:
filename: one of sha1's many filenames
data: the sha1's content to write in temporary
file
working_directory: the directory into which the
file is written
Returns:
The path to the temporary file created. That file is
filled in with the raw content's data.
"""
os.makedirs(working_directory, exist_ok=True)
temp_dir = tempfile.mkdtemp(dir=working_directory)
content_path = os.path.join(temp_dir, filename)
with open(content_path, "wb") as f:
f.write(data)
yield content_path
shutil.rmtree(temp_dir)
DEFAULT_CONFIG = {
INDEXER_CFG_KEY: {"cls": "memory"},
"storage": {"cls": "memory"},
"objstorage": {"cls": "memory"},
}
TId = TypeVar("TId")
"""type of the ids of index()ed objects."""
TData = TypeVar("TData")
"""type of the objects passed to index()."""
TResult = TypeVar("TResult")
"""return type of index()"""
class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta):
"""Base class for indexers to inherit from.
The main entry point is the :func:`run` function which is in
charge of triggering the computations on the batch dict/ids
received.
Indexers can:
- filter out ids whose data has already been indexed.
- retrieve ids data from storage or objstorage
- index this data depending on the object and store the result in
storage.
To implement a new object type indexer, inherit from the
BaseIndexer and implement indexing:
:meth:`~BaseIndexer.run`:
object_ids are different depending on object. For example: sha1 for
content, sha1_git for revision, directory, release, and id for origin
To implement a new concrete indexer, inherit from the object level
classes: :class:`ContentIndexer`, :class:`RevisionIndexer`,
:class:`OriginIndexer`.
Then you need to implement the following functions:
:meth:`~BaseIndexer.filter`:
filter out data already indexed (in storage).
:meth:`~BaseIndexer.index_object`:
compute index on id with data (retrieved from the storage or the
objstorage by the id key) and return the resulting index computation.
:meth:`~BaseIndexer.persist_index_computations`:
persist the results of multiple index computations in the storage.
The new indexer implementation can also override the following functions:
:meth:`~BaseIndexer.prepare`:
Configuration preparation for the indexer. When overriding, this must
call the `super().prepare()` instruction.
:meth:`~BaseIndexer.check`:
Configuration check for the indexer. When overriding, this must call the
`super().check()` instruction.
:meth:`~BaseIndexer.register_tools`:
This should return a dict of the tool(s) to use when indexing or
filtering.
"""
results: List[TResult]
USE_TOOLS = True
catch_exceptions = True
"""Prevents exceptions in `index()` from raising too high. Set to False
in tests to properly catch all exceptions."""
scheduler: Any
storage: StorageInterface
objstorage: Any
idx_storage: IndexerStorageInterface
def __init__(self, config=None, **kw) -> None:
"""Prepare and check that the indexer is ready to run.
"""
super().__init__()
if config is not None:
self.config = config
elif SWH_CONFIG:
self.config = SWH_CONFIG.copy()
else:
self.config = load_from_envvar()
self.config = merge_configs(DEFAULT_CONFIG, self.config)
self.prepare()
self.check()
self.log.debug("%s: config=%s", self, self.config)
def prepare(self) -> None:
"""Prepare the indexer's needed runtime configuration.
Without this step, the indexer cannot possibly run.
"""
config_storage = self.config.get("storage")
if config_storage:
self.storage = get_storage(**config_storage)
self.objstorage = get_objstorage(**self.config["objstorage"])
idx_storage = self.config[INDEXER_CFG_KEY]
self.idx_storage = get_indexer_storage(**idx_storage)
_log = logging.getLogger("requests.packages.urllib3.connectionpool")
_log.setLevel(logging.WARN)
self.log = logging.getLogger("swh.indexer")
if self.USE_TOOLS:
self.tools = list(self.register_tools(self.config.get("tools", [])))
self.results = []
@property
def tool(self) -> Dict:
return self.tools[0]
def check(self) -> None:
"""Check the indexer's configuration is ok before proceeding.
If ok, does nothing. If not raise error.
"""
if self.USE_TOOLS and not self.tools:
raise ValueError("Tools %s is unknown, cannot continue" % self.tools)
def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare the tool dict to be compliant with the storage api.
"""
return {"tool_%s" % key: value for key, value in tool.items()}
def register_tools(
self, tools: Union[Dict[str, Any], List[Dict[str, Any]]]
) -> List[Dict[str, Any]]:
"""Permit to register tools to the storage.
Add a sensible default which can be overridden if not
sufficient. (For now, all indexers use only one tool)
Expects the self.config['tools'] property to be set with
one or more tools.
Args:
tools: Either a dict or a list of dict.
Returns:
list: List of dicts with additional id key.
Raises:
ValueError: if not a list nor a dict.
"""
if isinstance(tools, list):
tools = list(map(self._prepare_tool, tools))
elif isinstance(tools, dict):
tools = [self._prepare_tool(tools)]
else:
raise ValueError("Configuration tool(s) must be a dict or list!")
if tools:
return self.idx_storage.indexer_configuration_add(tools)
else:
return []
def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]:
"""Index computation for the id and associated raw data.
Args:
id: identifier or Dict object
data: id's data from storage or objstorage depending on
object type
Returns:
dict: a dict that makes sense for the
:meth:`.persist_index_computations` method.
"""
raise NotImplementedError()
def filter(self, ids: List[TId]) -> Iterator[TId]:
"""Filter missing ids for that particular indexer.
Args:
ids: list of ids
Yields:
iterator of missing ids
"""
yield from ids
@abc.abstractmethod
def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]:
"""Persist the computation resulting from the index.
Args:
results: List of results. One result is the
result of the index function.
Returns:
a summary dict of what has been inserted in the storage
"""
return {}
class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]):
"""A content indexer working on a list of ids directly.
To work on indexer partition, use the :class:`ContentPartitionIndexer`
instead.
Note: :class:`ContentIndexer` is not an instantiable object. To
use it, one should inherit from this class and override the
methods mentioned in the :class:`BaseIndexer` class.
"""
def run(self, ids: List[Sha1], **kwargs) -> Dict:
"""Given a list of ids:
- retrieve the content from the storage
- execute the indexing computations
- store the results
Args:
ids (Iterable[Union[bytes, str]]): sha1's identifier list
**kwargs: passed to the `index` method
Returns:
A summary Dict of the task's status
"""
if "policy_update" in kwargs:
warnings.warn(
"'policy_update' argument is deprecated and ignored.",
DeprecationWarning,
)
del kwargs["policy_update"]
sha1s = [
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
]
results = []
summary: Dict = {"status": "uneventful"}
try:
for sha1 in sha1s:
try:
raw_content = self.objstorage.get(sha1)
except ObjNotFoundError:
self.log.warning(
"Content %s not found in objstorage"
% hashutil.hash_to_hex(sha1)
)
continue
res = self.index(sha1, raw_content, **kwargs)
if res: # If no results, skip it
results.extend(res)
summary["status"] = "eventful"
summary = self.persist_index_computations(results)
self.results = results
except Exception:
if not self.catch_exceptions:
raise
self.log.exception("Problem when reading contents metadata.")
summary["status"] = "failed"
return summary
class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]):
"""A content partition indexer.
This expects as input a partition_id and a nb_partitions. This will then index the
contents within that partition.
To work on a list of ids, use the :class:`ContentIndexer` instead.
Note: :class:`ContentPartitionIndexer` is not an instantiable
object. To use it, one should inherit from this class and override
the methods mentioned in the :class:`BaseIndexer` class.
"""
@abc.abstractmethod
def indexed_contents_in_partition(
- self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
- ) -> PagedResult[Sha1]:
+ self, partition_id: int, nb_partitions: int
+ ) -> Iterable[Sha1]:
"""Retrieve indexed contents within range [start, end].
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
page_token: opaque token used for pagination
- Returns:
- PagedResult of Sha1. If next_page_token is None, there is no more data
- to fetch
-
"""
pass
def _list_contents_to_index(
self, partition_id: int, nb_partitions: int, indexed: Set[Sha1]
- ) -> Iterator[Sha1]:
+ ) -> Iterable[Sha1]:
"""Compute from storage the new contents to index in the partition_id . The already
indexed contents are skipped.
Args:
partition_id: Index of the partition to fetch data from
nb_partitions: Total number of partition
indexed: Set of content already indexed.
Yields:
Sha1 id (bytes) of contents to index
"""
if not isinstance(partition_id, int) or not isinstance(nb_partitions, int):
raise TypeError(
f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}."
)
next_page_token = None
while True:
result = self.storage.content_get_partition(
partition_id, nb_partitions, page_token=next_page_token
)
contents = result.results
for c in contents:
_id = hashutil.hash_to_bytes(c.sha1)
if _id in indexed:
continue
yield _id
next_page_token = result.next_page_token
if next_page_token is None:
break
def _index_contents(
self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any
) -> Iterator[TResult]:
"""Index the contents within the partition_id.
Args:
start: Starting bound from range identifier
end: End range identifier
indexed: Set of content already indexed.
Yields:
indexing result as dict to persist in the indexer backend
"""
for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed):
try:
raw_content = self.objstorage.get(sha1)
except ObjNotFoundError:
self.log.warning(f"Content {sha1.hex()} not found in objstorage")
continue
yield from self.index(sha1, raw_content, **kwargs)
def _index_with_skipping_already_done(
self, partition_id: int, nb_partitions: int
) -> Iterator[TResult]:
"""Index not already indexed contents within the partition partition_id
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
Yields:
indexing result as dict to persist in the indexer backend
"""
- next_page_token = None
- contents = set()
- while True:
- indexed_page = self.indexed_contents_in_partition(
- partition_id, nb_partitions, page_token=next_page_token
- )
- for sha1 in indexed_page.results:
- contents.add(sha1)
- yield from self._index_contents(partition_id, nb_partitions, contents)
- next_page_token = indexed_page.next_page_token
- if next_page_token is None:
- break
+ already_indexed_contents = set(
+ self.indexed_contents_in_partition(partition_id, nb_partitions)
+ )
+
+ return self._index_contents(
+ partition_id, nb_partitions, already_indexed_contents
+ )
def run(
self,
partition_id: int,
nb_partitions: int,
skip_existing: bool = True,
**kwargs,
) -> Dict:
"""Given a partition of content ids, index the contents within.
Either the indexer is incremental (filter out existing computed data) or it
computes everything from scratch.
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
skip_existing: Skip existing indexed data
(default) or not
**kwargs: passed to the `index` method
Returns:
dict with the indexing task status
"""
summary: Dict[str, Any] = {"status": "uneventful"}
count = 0
try:
if skip_existing:
gen = self._index_with_skipping_already_done(
partition_id, nb_partitions
)
else:
gen = self._index_contents(partition_id, nb_partitions, indexed=set([]))
count_object_added_key: Optional[str] = None
for contents in utils.grouper(gen, n=self.config["write_batch_size"]):
res = self.persist_index_computations(list(contents))
if not count_object_added_key:
count_object_added_key = list(res.keys())[0]
count += res[count_object_added_key]
if count > 0:
summary["status"] = "eventful"
except Exception:
if not self.catch_exceptions:
raise
self.log.exception("Problem when computing metadata.")
summary["status"] = "failed"
if count > 0 and count_object_added_key:
summary[count_object_added_key] = count
return summary
class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]):
"""An object type indexer, inherits from the :class:`BaseIndexer` and
implements Origin indexing using the run method
Note: the :class:`OriginIndexer` is not an instantiable object.
To use it in another context one should inherit from this class
and override the methods mentioned in the :class:`BaseIndexer`
class.
"""
def run(self, origin_urls: List[str], **kwargs) -> Dict:
"""Given a list of origin urls:
- retrieve origins from storage
- execute the indexing computations
- store the results
Args:
origin_urls: list of origin urls.
**kwargs: passed to the `index` method
"""
if "policy_update" in kwargs:
warnings.warn(
"'policy_update' argument is deprecated and ignored.",
DeprecationWarning,
)
del kwargs["policy_update"]
summary: Dict[str, Any] = {"status": "uneventful"}
try:
results = self.index_list(origin_urls, **kwargs)
except Exception:
if not self.catch_exceptions:
raise
summary["status"] = "failed"
return summary
summary_persist = self.persist_index_computations(results)
self.results = results
if summary_persist:
for value in summary_persist.values():
if value > 0:
summary["status"] = "eventful"
summary.update(summary_persist)
return summary
def index_list(self, origin_urls: List[str], **kwargs) -> List[TResult]:
results = []
for origin_url in origin_urls:
try:
results.extend(self.index(origin_url, **kwargs))
except Exception:
self.log.exception("Problem when processing origin %s", origin_url)
raise
return results
class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]):
"""An object type indexer, inherits from the :class:`BaseIndexer` and
implements Revision indexing using the run method
Note: the :class:`RevisionIndexer` is not an instantiable object.
To use it in another context one should inherit from this class
and override the methods mentioned in the :class:`BaseIndexer`
class.
"""
def run(self, ids: List[Sha1Git], **kwargs) -> Dict:
"""Given a list of sha1_gits:
- retrieve revisions from storage
- execute the indexing computations
- store the results
Args:
ids: sha1_git's identifier list
"""
if "policy_update" in kwargs:
warnings.warn(
"'policy_update' argument is deprecated and ignored.",
DeprecationWarning,
)
del kwargs["policy_update"]
summary: Dict[str, Any] = {"status": "uneventful"}
results = []
revision_ids = [
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
]
for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)):
if not rev:
# TODO: call self.index() with rev=None?
self.log.warning(
"Revision %s not found in storage", hashutil.hash_to_hex(rev_id)
)
continue
try:
results.extend(self.index(rev_id, rev))
except Exception:
if not self.catch_exceptions:
raise
self.log.exception("Problem when processing revision")
summary["status"] = "failed"
return summary
summary_persist = self.persist_index_computations(results)
if summary_persist:
for value in summary_persist.values():
if value > 0:
summary["status"] = "eventful"
summary.update(summary_persist)
self.results = results
return summary
diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py
index 3185534..aa5fb4e 100644
--- a/swh/indexer/mimetype.py
+++ b/swh/indexer/mimetype.py
@@ -1,164 +1,163 @@
-# Copyright (C) 2016-2020 The Software Heritage developers
+# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
import magic
+from swh.core.api.classes import stream_results
from swh.core.config import merge_configs
-from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult, Sha1
+from swh.indexer.storage.interface import IndexerStorageInterface, Sha1
from swh.indexer.storage.model import ContentMimetypeRow
from .indexer import ContentIndexer, ContentPartitionIndexer
if not hasattr(magic.Magic, "from_buffer"):
raise ImportError(
'Expected "import magic" to import python-magic, but file_magic '
"was imported instead."
)
def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, str]:
"""Determine mimetype and encoding from the raw content.
Args:
raw_content: content's raw data
Returns:
dict: mimetype and encoding key and corresponding values.
"""
m = magic.Magic(mime=True, mime_encoding=True)
res = m.from_buffer(raw_content)
try:
mimetype, encoding = res.split("; charset=")
except ValueError:
mimetype, encoding = res, ""
return {
"mimetype": mimetype,
"encoding": encoding,
}
DEFAULT_CONFIG: Dict[str, Any] = {
"tools": {
"name": "file",
"version": "1:5.30-1+deb9u1",
"configuration": {"type": "library", "debian-package": "python3-magic"},
},
"write_batch_size": 1000,
}
class MixinMimetypeIndexer:
"""Mixin mimetype indexer.
See :class:`MimetypeIndexer` and :class:`MimetypePartitionIndexer`
"""
tool: Any
idx_storage: IndexerStorageInterface
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = merge_configs(DEFAULT_CONFIG, self.config)
def index(
self, id: Sha1, data: Optional[bytes] = None, **kwargs
) -> List[ContentMimetypeRow]:
"""Index sha1s' content and store result.
Args:
id: content's identifier
data: raw content in bytes
Returns:
dict: content's mimetype; dict keys being
- id: content's identifier (sha1)
- mimetype: mimetype in bytes
- encoding: encoding in bytes
"""
assert data is not None
properties = compute_mimetype_encoding(data)
return [
ContentMimetypeRow(
id=id,
indexer_configuration_id=self.tool["id"],
mimetype=properties["mimetype"],
encoding=properties["encoding"],
)
]
def persist_index_computations(
self, results: List[ContentMimetypeRow]
) -> Dict[str, int]:
"""Persist the results in storage.
Args:
results: list of content's mimetype dicts
(see :meth:`.index`)
"""
return self.idx_storage.content_mimetype_add(results)
class MimetypeIndexer(MixinMimetypeIndexer, ContentIndexer[ContentMimetypeRow]):
"""Mimetype Indexer working on list of content identifiers.
It:
- (optionally) filters out content already indexed (cf.
:meth:`.filter`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
def filter(self, ids):
"""Filter out known sha1s and return only missing ones.
"""
yield from self.idx_storage.content_mimetype_missing(
({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids)
)
class MimetypePartitionIndexer(
MixinMimetypeIndexer, ContentPartitionIndexer[ContentMimetypeRow]
):
"""Mimetype Range Indexer working on range of content identifiers.
It:
- (optionally) filters out content already indexed (cf
:meth:`.indexed_contents_in_partition`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
def indexed_contents_in_partition(
- self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None,
- ) -> PagedResult[Sha1]:
+ self, partition_id: int, nb_partitions: int,
+ ) -> Iterable[Sha1]:
"""Retrieve indexed content ids within partition_id.
Args:
partition_id: Index of the partition to fetch
nb_partitions: Total number of partitions to split into
page_token: opaque token used for pagination
-
- Returns:
- PagedResult of Sha1. If next_page_token is None, there is no more data
- to fetch
-
"""
- return self.idx_storage.content_mimetype_get_partition(
- self.tool["id"], partition_id, nb_partitions, page_token=page_token
+ return stream_results(
+ self.idx_storage.content_mimetype_get_partition,
+ self.tool["id"],
+ partition_id,
+ nb_partitions,
)
diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py
index ec2e084..6f41d44 100644
--- a/swh/indexer/storage/db.py
+++ b/swh/indexer/storage/db.py
@@ -1,550 +1,550 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, Iterable, Iterator, List
from swh.core.db import BaseDb
from swh.core.db.db_utils import execute_values_generator, stored_procedure
from swh.model import hashutil
from .interface import Sha1
class Db(BaseDb):
"""Proxy to the SWH Indexer DB, with wrappers around stored procedures
"""
content_mimetype_hash_keys = ["id", "indexer_configuration_id"]
def _missing_from_list(
self, table: str, data: Iterable[Dict], hash_keys: List[str], cur=None
):
"""Read from table the data with hash_keys that are missing.
Args:
table: Table name (e.g content_mimetype, content_language,
etc...)
data: Dict of data to read from
hash_keys: List of keys to read in the data dict.
Yields:
The data which is missing from the db.
"""
cur = self._cursor(cur)
keys = ", ".join(hash_keys)
equality = " AND ".join(("t.%s = c.%s" % (key, key)) for key in hash_keys)
yield from execute_values_generator(
cur,
"""
select %s from (values %%s) as t(%s)
where not exists (
select 1 from %s c
where %s
)
"""
% (keys, keys, table, equality),
(tuple(m[k] for k in hash_keys) for m in data),
)
def content_mimetype_missing_from_list(
self, mimetypes: Iterable[Dict], cur=None
) -> Iterator[Sha1]:
"""List missing mimetypes.
"""
yield from self._missing_from_list(
"content_mimetype", mimetypes, self.content_mimetype_hash_keys, cur=cur
)
content_mimetype_cols = [
"id",
"mimetype",
"encoding",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
]
@stored_procedure("swh_mktemp_content_mimetype")
def mktemp_content_mimetype(self, cur=None):
pass
def content_mimetype_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("select * from swh_content_mimetype_add()")
return cur.fetchone()[0]
def _convert_key(self, key, main_table="c"):
"""Convert keys according to specific use in the module.
Args:
key (str): Key expression to change according to the alias
used in the query
main_table (str): Alias to use for the main table. Default
to c for content_{something}.
Expected:
Tables content_{something} being aliased as 'c' (something
in {language, mimetype, ...}), table indexer_configuration
being aliased as 'i'.
"""
if key == "id":
return "%s.id" % main_table
elif key == "tool_id":
return "i.id as tool_id"
elif key == "license":
return (
"""
(
select name
from fossology_license
where id = %s.license_id
)
as licenses"""
% main_table
)
return key
def _get_from_list(self, table, ids, cols, cur=None, id_col="id"):
"""Fetches entries from the `table` such that their `id` field
(or whatever is given to `id_col`) is in `ids`.
Returns the columns `cols`.
The `cur` parameter is used to connect to the database.
"""
cur = self._cursor(cur)
keys = map(self._convert_key, cols)
query = """
select {keys}
from (values %s) as t(id)
inner join {table} c
on c.{id_col}=t.id
inner join indexer_configuration i
on c.indexer_configuration_id=i.id;
""".format(
keys=", ".join(keys), id_col=id_col, table=table
)
yield from execute_values_generator(cur, query, ((_id,) for _id in ids))
content_indexer_names = {
"mimetype": "content_mimetype",
"fossology_license": "content_fossology_license",
}
def content_get_range(
self,
content_type,
start,
end,
indexer_configuration_id,
limit=1000,
with_textual_data=False,
cur=None,
):
"""Retrieve contents with content_type, within range [start, end]
bound by limit and associated to the given indexer
configuration id.
When asking to work on textual content, that filters on the
mimetype table with any mimetype that is not binary.
"""
cur = self._cursor(cur)
table = self.content_indexer_names[content_type]
if with_textual_data:
extra = """inner join content_mimetype cm
on (t.id=cm.id and cm.mimetype like 'text/%%' and
%(start)s <= cm.id and cm.id <= %(end)s)
"""
else:
extra = ""
query = f"""select t.id
from {table} t
{extra}
where t.indexer_configuration_id=%(tool_id)s
and %(start)s <= t.id and t.id <= %(end)s
order by t.indexer_configuration_id, t.id
limit %(limit)s"""
cur.execute(
query,
{
"start": start,
"end": end,
"tool_id": indexer_configuration_id,
"limit": limit,
},
)
yield from cur
def content_mimetype_get_from_list(self, ids, cur=None):
yield from self._get_from_list(
"content_mimetype", ids, self.content_mimetype_cols, cur=cur
)
content_language_hash_keys = ["id", "indexer_configuration_id"]
def content_language_missing_from_list(self, languages, cur=None):
"""List missing languages.
"""
yield from self._missing_from_list(
"content_language", languages, self.content_language_hash_keys, cur=cur
)
content_language_cols = [
"id",
"lang",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
]
@stored_procedure("swh_mktemp_content_language")
def mktemp_content_language(self, cur=None):
pass
def content_language_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("select * from swh_content_language_add()")
return cur.fetchone()[0]
def content_language_get_from_list(self, ids, cur=None):
yield from self._get_from_list(
"content_language", ids, self.content_language_cols, cur=cur
)
content_ctags_hash_keys = ["id", "indexer_configuration_id"]
def content_ctags_missing_from_list(self, ctags, cur=None):
"""List missing ctags.
"""
yield from self._missing_from_list(
"content_ctags", ctags, self.content_ctags_hash_keys, cur=cur
)
content_ctags_cols = [
"id",
"name",
"kind",
"line",
"lang",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
]
@stored_procedure("swh_mktemp_content_ctags")
def mktemp_content_ctags(self, cur=None):
pass
def content_ctags_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("select * from swh_content_ctags_add()")
return cur.fetchone()[0]
def content_ctags_get_from_list(self, ids, cur=None):
cur = self._cursor(cur)
keys = map(self._convert_key, self.content_ctags_cols)
yield from execute_values_generator(
cur,
"""
select %s
from (values %%s) as t(id)
inner join content_ctags c
on c.id=t.id
inner join indexer_configuration i
on c.indexer_configuration_id=i.id
order by line
"""
% ", ".join(keys),
((_id,) for _id in ids),
)
def content_ctags_search(self, expression, last_sha1, limit, cur=None):
cur = self._cursor(cur)
if not last_sha1:
query = """SELECT %s
FROM swh_content_ctags_search(%%s, %%s)""" % (
",".join(self.content_ctags_cols)
)
cur.execute(query, (expression, limit))
else:
if last_sha1 and isinstance(last_sha1, bytes):
last_sha1 = "\\x%s" % hashutil.hash_to_hex(last_sha1)
elif last_sha1:
last_sha1 = "\\x%s" % last_sha1
query = """SELECT %s
FROM swh_content_ctags_search(%%s, %%s, %%s)""" % (
",".join(self.content_ctags_cols)
)
cur.execute(query, (expression, limit, last_sha1))
yield from cur
content_fossology_license_cols = [
"id",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
"license",
]
@stored_procedure("swh_mktemp_content_fossology_license")
def mktemp_content_fossology_license(self, cur=None):
pass
def content_fossology_license_add_from_temp(self, cur=None):
"""Add new licenses per content.
"""
cur = self._cursor(cur)
cur.execute("select * from swh_content_fossology_license_add()")
return cur.fetchone()[0]
def content_fossology_license_get_from_list(self, ids, cur=None):
"""Retrieve licenses per id.
"""
cur = self._cursor(cur)
keys = map(self._convert_key, self.content_fossology_license_cols)
yield from execute_values_generator(
cur,
"""
select %s
from (values %%s) as t(id)
inner join content_fossology_license c on t.id=c.id
inner join indexer_configuration i
on i.id=c.indexer_configuration_id
"""
% ", ".join(keys),
((_id,) for _id in ids),
)
content_metadata_hash_keys = ["id", "indexer_configuration_id"]
def content_metadata_missing_from_list(self, metadata, cur=None):
"""List missing metadata.
"""
yield from self._missing_from_list(
"content_metadata", metadata, self.content_metadata_hash_keys, cur=cur
)
content_metadata_cols = [
"id",
"metadata",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
]
@stored_procedure("swh_mktemp_content_metadata")
def mktemp_content_metadata(self, cur=None):
pass
def content_metadata_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("select * from swh_content_metadata_add()")
return cur.fetchone()[0]
def content_metadata_get_from_list(self, ids, cur=None):
yield from self._get_from_list(
"content_metadata", ids, self.content_metadata_cols, cur=cur
)
revision_intrinsic_metadata_hash_keys = ["id", "indexer_configuration_id"]
def revision_intrinsic_metadata_missing_from_list(self, metadata, cur=None):
"""List missing metadata.
"""
yield from self._missing_from_list(
"revision_intrinsic_metadata",
metadata,
self.revision_intrinsic_metadata_hash_keys,
cur=cur,
)
revision_intrinsic_metadata_cols = [
"id",
"metadata",
"mappings",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
]
@stored_procedure("swh_mktemp_revision_intrinsic_metadata")
def mktemp_revision_intrinsic_metadata(self, cur=None):
pass
def revision_intrinsic_metadata_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("select * from swh_revision_intrinsic_metadata_add()")
return cur.fetchone()[0]
def revision_intrinsic_metadata_get_from_list(self, ids, cur=None):
yield from self._get_from_list(
"revision_intrinsic_metadata",
ids,
self.revision_intrinsic_metadata_cols,
cur=cur,
)
origin_intrinsic_metadata_cols = [
"id",
"metadata",
"from_revision",
"mappings",
"tool_id",
"tool_name",
"tool_version",
"tool_configuration",
]
origin_intrinsic_metadata_regconfig = "pg_catalog.simple"
"""The dictionary used to normalize 'metadata' and queries.
'pg_catalog.simple' provides no stopword, so it should be suitable
for proper names and non-English content.
When updating this value, make sure to add a new index on
origin_intrinsic_metadata.metadata."""
@stored_procedure("swh_mktemp_origin_intrinsic_metadata")
def mktemp_origin_intrinsic_metadata(self, cur=None):
pass
def origin_intrinsic_metadata_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("select * from swh_origin_intrinsic_metadata_add()")
return cur.fetchone()[0]
def origin_intrinsic_metadata_get_from_list(self, ids, cur=None):
yield from self._get_from_list(
"origin_intrinsic_metadata",
ids,
self.origin_intrinsic_metadata_cols,
cur=cur,
id_col="id",
)
def origin_intrinsic_metadata_search_fulltext(self, terms, *, limit, cur):
regconfig = self.origin_intrinsic_metadata_regconfig
tsquery_template = " && ".join(
"plainto_tsquery('%s', %%s)" % regconfig for _ in terms
)
tsquery_args = [(term,) for term in terms]
keys = (
self._convert_key(col, "oim") for col in self.origin_intrinsic_metadata_cols
)
query = (
"SELECT {keys} FROM origin_intrinsic_metadata AS oim "
"INNER JOIN indexer_configuration AS i "
"ON oim.indexer_configuration_id=i.id "
"JOIN LATERAL (SELECT {tsquery_template}) AS s(tsq) ON true "
"WHERE oim.metadata_tsvector @@ tsq "
"ORDER BY ts_rank(oim.metadata_tsvector, tsq, 1) DESC "
"LIMIT %s;"
).format(keys=", ".join(keys), tsquery_template=tsquery_template)
cur.execute(query, tsquery_args + [limit])
yield from cur
def origin_intrinsic_metadata_search_by_producer(
self, last, limit, ids_only, mappings, tool_ids, cur
):
if ids_only:
keys = "oim.id"
else:
keys = ", ".join(
(
self._convert_key(col, "oim")
for col in self.origin_intrinsic_metadata_cols
)
)
query_parts = [
"SELECT %s" % keys,
"FROM origin_intrinsic_metadata AS oim",
"INNER JOIN indexer_configuration AS i",
"ON oim.indexer_configuration_id=i.id",
]
args = []
where = []
if last:
where.append("oim.id > %s")
args.append(last)
if mappings is not None:
where.append("oim.mappings && %s")
- args.append(mappings)
+ args.append(list(mappings))
if tool_ids is not None:
where.append("oim.indexer_configuration_id = ANY(%s)")
- args.append(tool_ids)
+ args.append(list(tool_ids))
if where:
query_parts.append("WHERE")
query_parts.append(" AND ".join(where))
if limit:
query_parts.append("LIMIT %s")
args.append(limit)
cur.execute(" ".join(query_parts), args)
yield from cur
indexer_configuration_cols = [
"id",
"tool_name",
"tool_version",
"tool_configuration",
]
@stored_procedure("swh_mktemp_indexer_configuration")
def mktemp_indexer_configuration(self, cur=None):
pass
def indexer_configuration_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute(
"SELECT %s from swh_indexer_configuration_add()"
% (",".join(self.indexer_configuration_cols),)
)
yield from cur
def indexer_configuration_get(
self, tool_name, tool_version, tool_configuration, cur=None
):
cur = self._cursor(cur)
cur.execute(
"""select %s
from indexer_configuration
where tool_name=%%s and
tool_version=%%s and
tool_configuration=%%s"""
% (",".join(self.indexer_configuration_cols)),
(tool_name, tool_version, tool_configuration),
)
return cur.fetchone()
def indexer_configuration_get_from_id(self, id_, cur=None):
cur = self._cursor(cur)
cur.execute(
"""select %s
from indexer_configuration
where id=%%s"""
% (",".join(self.indexer_configuration_cols)),
(id_,),
)
return cur.fetchone()
diff --git a/swh/indexer/tests/conftest.py b/swh/indexer/tests/conftest.py
index e16588b..270812d 100644
--- a/swh/indexer/tests/conftest.py
+++ b/swh/indexer/tests/conftest.py
@@ -1,105 +1,127 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
import os
+from os import path
from typing import List, Tuple
from unittest.mock import patch
import pytest
import yaml
+from swh.core.db.pytest_plugin import postgresql_fact
+import swh.indexer
from swh.indexer.storage import get_indexer_storage
from swh.objstorage.factory import get_objstorage
from swh.storage import get_storage
from .utils import fill_obj_storage, fill_storage
TASK_NAMES: List[Tuple[str, str]] = [
# (scheduler-task-type, task-class-test-name)
("index-revision-metadata", "revision_intrinsic_metadata"),
("index-origin-metadata", "origin_intrinsic_metadata"),
]
+SQL_FILES = path.join(path.dirname(swh.indexer.__file__), "sql", "*.sql")
+
+
+idx_storage_postgresql = postgresql_fact(
+ "postgresql_proc", db_name="indexer_storage", dump_files=SQL_FILES,
+)
+
+
@pytest.fixture
def indexer_scheduler(swh_scheduler):
# Insert the expected task types within the scheduler
for task_name, task_class_name in TASK_NAMES:
swh_scheduler.create_task_type(
{
"type": task_name,
"description": f"The {task_class_name} indexer testing task",
"backend_name": f"swh.indexer.tests.tasks.{task_class_name}",
"default_interval": timedelta(days=1),
"min_interval": timedelta(hours=6),
"max_interval": timedelta(days=12),
"num_retries": 3,
}
)
return swh_scheduler
@pytest.fixture
-def idx_storage():
+def idx_storage_backend_config(idx_storage_postgresql):
+ """Basic pg storage configuration with no journal collaborator for the indexer
+ storage (to avoid pulling optional dependency on clients of this fixture)
+
+ """
+ return {
+ "cls": "local",
+ "db": idx_storage_postgresql.dsn,
+ }
+
+
+@pytest.fixture
+def swh_indexer_config(
+ swh_storage_backend_config, idx_storage_backend_config, swh_scheduler_config
+):
+ return {
+ "storage": swh_storage_backend_config,
+ "objstorage": {"cls": "memory"},
+ "indexer_storage": idx_storage_backend_config,
+ "scheduler": {"cls": "local", **swh_scheduler_config},
+ "tools": {
+ "name": "file",
+ "version": "1:5.30-1+deb9u1",
+ "configuration": {"type": "library", "debian-package": "python3-magic"},
+ },
+ "compute_checksums": ["blake2b512"], # for rehash indexer
+ }
+
+
+@pytest.fixture
+def idx_storage(swh_indexer_config):
"""An instance of in-memory indexer storage that gets injected into all
indexers classes.
"""
- idx_storage = get_indexer_storage("memory")
- with patch("swh.indexer.storage.in_memory.IndexerStorage") as idx_storage_mock:
- idx_storage_mock.return_value = idx_storage
- yield idx_storage
+ idx_storage_config = swh_indexer_config["indexer_storage"]
+ return get_indexer_storage(**idx_storage_config)
@pytest.fixture
-def storage():
+def storage(swh_indexer_config):
"""An instance of in-memory storage that gets injected into all indexers
classes.
"""
- storage = get_storage(cls="memory")
+ storage = get_storage(**swh_indexer_config["storage"])
fill_storage(storage)
- with patch("swh.storage.in_memory.InMemoryStorage") as storage_mock:
- storage_mock.return_value = storage
- yield storage
+ return storage
@pytest.fixture
-def obj_storage():
+def obj_storage(swh_indexer_config):
"""An instance of in-memory objstorage that gets injected into all indexers
classes.
"""
- objstorage = get_objstorage("memory")
+ objstorage = get_objstorage(**swh_indexer_config["objstorage"])
fill_obj_storage(objstorage)
with patch.dict(
"swh.objstorage.factory._STORAGE_CLASSES", {"memory": lambda: objstorage}
):
yield objstorage
-@pytest.fixture
-def swh_indexer_config():
- return {
- "storage": {"cls": "memory"},
- "objstorage": {"cls": "memory"},
- "indexer_storage": {"cls": "memory"},
- "tools": {
- "name": "file",
- "version": "1:5.30-1+deb9u1",
- "configuration": {"type": "library", "debian-package": "python3-magic"},
- },
- "compute_checksums": ["blake2b512"], # for rehash indexer
- }
-
-
@pytest.fixture
def swh_config(swh_indexer_config, monkeypatch, tmp_path):
conffile = os.path.join(str(tmp_path), "indexer.yml")
with open(conffile, "w") as f:
f.write(yaml.dump(swh_indexer_config))
monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile)
return conffile
diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py
index 3deca15..265fbea 100644
--- a/swh/indexer/tests/test_cli.py
+++ b/swh/indexer/tests/test_cli.py
@@ -1,397 +1,493 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import datetime
from functools import reduce
import re
-import tempfile
from typing import Any, Dict, List
from unittest.mock import patch
from click.testing import CliRunner
-from confluent_kafka import Consumer, Producer
+from confluent_kafka import Consumer
import pytest
from swh.indexer.cli import indexer_cli_group
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.indexer.storage.model import (
OriginIntrinsicMetadataRow,
RevisionIntrinsicMetadataRow,
)
-from swh.journal.serializers import value_to_kafka
+from swh.journal.writer import get_journal_writer
from swh.model.hashutil import hash_to_bytes
-
-CLI_CONFIG = """
-scheduler:
- cls: foo
- args: {}
-storage:
- cls: memory
-indexer_storage:
- cls: memory
-"""
+from swh.model.model import OriginVisitStatus
def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]:
tools: List[Dict[str, Any]] = [
{"tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {},}
for i in range(2)
]
tools = idx_storage.indexer_configuration_add(tools)
origin_metadata = [
OriginIntrinsicMetadataRow(
id="file://dev/%04d" % origin_id,
- from_revision=hash_to_bytes("abcd{:0>4}".format(origin_id)),
+ from_revision=hash_to_bytes("abcd{:0>36}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
revision_metadata = [
RevisionIntrinsicMetadataRow(
- id=hash_to_bytes("abcd{:0>4}".format(origin_id)),
+ id=hash_to_bytes("abcd{:0>36}".format(origin_id)),
indexer_configuration_id=tools[origin_id % 2]["id"],
metadata={"name": "origin %d" % origin_id},
mappings=["mapping%d" % (origin_id % 10)],
)
for origin_id in range(nb_rows)
]
idx_storage.revision_intrinsic_metadata_add(revision_metadata)
idx_storage.origin_intrinsic_metadata_add(origin_metadata)
return [tool["id"] for tool in tools]
def _origins_in_task_args(tasks):
"""Returns the set of origins contained in the arguments of the
provided tasks (assumed to be of type index-origin-metadata)."""
return reduce(
set.union, (set(task["arguments"]["args"][0]) for task in tasks), set()
)
def _assert_tasks_for_origins(tasks, origins):
expected_kwargs = {}
assert {task["type"] for task in tasks} == {"index-origin-metadata"}
assert all(len(task["arguments"]["args"]) == 1 for task in tasks)
for task in tasks:
assert task["arguments"]["kwargs"] == expected_kwargs, task
assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins])
-def invoke(scheduler, catch_exceptions, args):
- runner = CliRunner()
- with patch(
- "swh.scheduler.get_scheduler"
- ) as get_scheduler_mock, tempfile.NamedTemporaryFile(
- "a", suffix=".yml"
- ) as config_fd:
- config_fd.write(CLI_CONFIG)
- config_fd.seek(0)
- get_scheduler_mock.return_value = scheduler
- result = runner.invoke(indexer_cli_group, ["-C" + config_fd.name] + args)
- if not catch_exceptions and result.exception:
- print(result.output)
- raise result.exception
- return result
-
-
-def test_mapping_list(indexer_scheduler):
- result = invoke(indexer_scheduler, False, ["mapping", "list",])
+@pytest.fixture
+def cli_runner():
+ return CliRunner()
+
+
+def test_cli_mapping_list(cli_runner, swh_config):
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "mapping", "list"],
+ catch_exceptions=False,
+ )
expected_output = "\n".join(
["codemeta", "gemspec", "maven", "npm", "pkg-info", "",]
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
-def test_mapping_list_terms(indexer_scheduler):
- result = invoke(indexer_scheduler, False, ["mapping", "list-terms",])
+def test_cli_mapping_list_terms(cli_runner, swh_config):
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "mapping", "list-terms"],
+ catch_exceptions=False,
+ )
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
-def test_mapping_list_terms_exclude(indexer_scheduler):
- result = invoke(
- indexer_scheduler,
- False,
- ["mapping", "list-terms", "--exclude-mapping", "codemeta"],
+def test_cli_mapping_list_terms_exclude(cli_runner, swh_config):
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"],
+ catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert re.search(r"http://schema.org/url:\n.*npm", result.output)
assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output)
assert not re.search(
r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta",
result.output,
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_empty_db(indexer_scheduler, idx_storage, storage):
- result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",])
+def test_cli_origin_metadata_reindex_empty_db(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
+):
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "schedule", "reindex_origin_metadata",],
+ catch_exceptions=False,
+ )
expected_output = "Nothing to do (no origin metadata matched the criteria).\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_divisor(indexer_scheduler, idx_storage, storage):
+def test_cli_origin_metadata_reindex_divisor(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
+):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
- result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",])
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "schedule", "reindex_origin_metadata",],
+ catch_exceptions=False,
+ )
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 9
_assert_tasks_for_origins(tasks, range(90))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_dry_run(indexer_scheduler, idx_storage, storage):
+def test_cli_origin_metadata_reindex_dry_run(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
+):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 90)
- result = invoke(
- indexer_scheduler, False, ["schedule", "--dry-run", "reindex_origin_metadata",]
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata",],
+ catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (60 origins).\n"
"Scheduled 9 tasks (90 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 0
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_nondivisor(indexer_scheduler, idx_storage, storage):
+def test_cli_origin_metadata_reindex_nondivisor(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
+):
"""Tests the re-indexing when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
fill_idx_storage(idx_storage, 70)
- result = invoke(
- indexer_scheduler,
- False,
- ["schedule", "reindex_origin_metadata", "--batch-size", "20",],
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ [
+ "-C",
+ swh_config,
+ "schedule",
+ "reindex_origin_metadata",
+ "--batch-size",
+ "20",
+ ],
+ catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (60 origins).\n"
"Scheduled 4 tasks (70 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 4
_assert_tasks_for_origins(tasks, range(70))
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_filter_one_mapping(
- indexer_scheduler, idx_storage, storage
+def test_cli_origin_metadata_reindex_filter_one_mapping(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
- result = invoke(
- indexer_scheduler,
- False,
- ["schedule", "reindex_origin_metadata", "--mapping", "mapping1",],
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ [
+ "-C",
+ swh_config,
+ "schedule",
+ "reindex_origin_metadata",
+ "--mapping",
+ "mapping1",
+ ],
+ catch_exceptions=False,
)
# Check the output
expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 2
_assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101])
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_filter_two_mappings(
- indexer_scheduler, idx_storage, storage
+def test_cli_origin_metadata_reindex_filter_two_mappings(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
fill_idx_storage(idx_storage, 110)
- result = invoke(
- indexer_scheduler,
- False,
+ result = cli_runner.invoke(
+ indexer_cli_group,
[
+ "--config-file",
+ swh_config,
"schedule",
"reindex_origin_metadata",
"--mapping",
"mapping1",
"--mapping",
"mapping2",
],
+ catch_exceptions=False,
)
# Check the output
expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 3
_assert_tasks_for_origins(
tasks,
[
1,
11,
21,
31,
41,
51,
61,
71,
81,
91,
101,
2,
12,
22,
32,
42,
52,
62,
72,
82,
92,
102,
],
)
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
@patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3)
-def test_origin_metadata_reindex_filter_one_tool(
- indexer_scheduler, idx_storage, storage
+def test_cli_origin_metadata_reindex_filter_one_tool(
+ cli_runner, swh_config, indexer_scheduler, idx_storage, storage
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
tool_ids = fill_idx_storage(idx_storage, 110)
- result = invoke(
- indexer_scheduler,
- False,
- ["schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]),],
+ result = cli_runner.invoke(
+ indexer_cli_group,
+ [
+ "-C",
+ swh_config,
+ "schedule",
+ "reindex_origin_metadata",
+ "--tool-id",
+ str(tool_ids[0]),
+ ],
+ catch_exceptions=False,
)
# Check the output
expected_output = (
"Scheduled 3 tasks (30 origins).\n"
"Scheduled 6 tasks (55 origins).\n"
"Done.\n"
)
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
tasks = indexer_scheduler.search_tasks()
assert len(tasks) == 6
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)])
-def test_journal_client(
- storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer
+def now():
+ return datetime.datetime.now(tz=datetime.timezone.utc)
+
+
+def test_cli_journal_client(
+ cli_runner,
+ swh_config,
+ indexer_scheduler,
+ kafka_prefix: str,
+ kafka_server,
+ consumer: Consumer,
):
"""Test the 'swh indexer journal-client' cli tool."""
- producer = Producer(
- {
- "bootstrap.servers": kafka_server,
- "client.id": "test producer",
- "acks": "all",
- }
+ journal_writer = get_journal_writer(
+ "kafka",
+ brokers=[kafka_server],
+ prefix=kafka_prefix,
+ client_id="test producer",
+ value_sanitizer=lambda object_type, value: value,
+ flush_timeout=3, # fail early if something is going wrong
)
- STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}}
- producer.produce(
- topic=f"{kafka_prefix}.origin_visit_status",
- key=b"bogus",
- value=value_to_kafka(STATUS),
- )
+ visit_statuses = [
+ OriginVisitStatus(
+ origin="file:///dev/zero",
+ visit=1,
+ date=now(),
+ status="full",
+ snapshot=None,
+ ),
+ OriginVisitStatus(
+ origin="file:///dev/foobar",
+ visit=2,
+ date=now(),
+ status="full",
+ snapshot=None,
+ ),
+ OriginVisitStatus(
+ origin="file:///tmp/spamegg",
+ visit=3,
+ date=now(),
+ status="full",
+ snapshot=None,
+ ),
+ OriginVisitStatus(
+ origin="file:///dev/0002",
+ visit=6,
+ date=now(),
+ status="full",
+ snapshot=None,
+ ),
+ OriginVisitStatus( # will be filtered out due to its 'partial' status
+ origin="file:///dev/0000",
+ visit=4,
+ date=now(),
+ status="partial",
+ snapshot=None,
+ ),
+ OriginVisitStatus( # will be filtered out due to its 'ongoing' status
+ origin="file:///dev/0001",
+ visit=5,
+ date=now(),
+ status="ongoing",
+ snapshot=None,
+ ),
+ ]
+
+ journal_writer.write_additions("origin_visit_status", visit_statuses)
+ visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
- result = invoke(
- indexer_scheduler,
- False,
+ result = cli_runner.invoke(
+ indexer_cli_group,
[
+ "-C",
+ swh_config,
"journal-client",
- "--stop-after-objects",
- "1",
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
+ "--stop-after-objects",
+ len(visit_statuses),
+ "--origin-metadata-task-type",
+ "index-origin-metadata",
],
+ catch_exceptions=False,
)
# Check the output
expected_output = "Done.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
# Check scheduled tasks
- tasks = indexer_scheduler.search_tasks()
- assert len(tasks) == 1
- _assert_tasks_for_origins(tasks, [0])
+ tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata")
+
+ # This can be split into multiple tasks but no more than the origin-visit-statuses
+ # written in the journal
+ assert len(tasks) <= len(visit_statuses_full)
+
+ actual_origins = []
+ for task in tasks:
+ actual_task = dict(task)
+ assert actual_task["type"] == "index-origin-metadata"
+ scheduled_origins = actual_task["arguments"]["args"][0]
+ actual_origins.extend(scheduled_origins)
+
+ assert set(actual_origins) == {vs.origin for vs in visit_statuses_full}
-def test_journal_client_without_brokers(
- storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer
+def test_cli_journal_client_without_brokers(
+ cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer
):
"""Without brokers configuration, the cli fails."""
with pytest.raises(ValueError, match="brokers"):
- invoke(
- indexer_scheduler, False, ["journal-client",],
+ cli_runner.invoke(
+ indexer_cli_group,
+ ["-C", swh_config, "journal-client",],
+ catch_exceptions=False,
)
diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py
index 10e7155..6edaa91 100644
--- a/swh/indexer/tests/test_indexer.py
+++ b/swh/indexer/tests/test_indexer.py
@@ -1,106 +1,152 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
from unittest.mock import Mock
import pytest
from swh.indexer.indexer import (
ContentIndexer,
ContentPartitionIndexer,
OriginIndexer,
RevisionIndexer,
)
from swh.indexer.storage import PagedResult, Sha1
+from swh.model.model import Content
from .utils import BASE_TEST_CONFIG
class _TestException(Exception):
pass
class CrashingIndexerMixin:
USE_TOOLS = False
def index(
self, id: Any, data: Optional[Any] = None, **kwargs
) -> List[Dict[str, Any]]:
raise _TestException()
def persist_index_computations(self, results) -> Dict[str, int]:
return {}
def indexed_contents_in_partition(
- self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
- ) -> PagedResult[Sha1]:
+ self, partition_id: int, nb_partitions: int
+ ) -> Iterable[Sha1]:
raise _TestException()
class CrashingContentIndexer(CrashingIndexerMixin, ContentIndexer):
pass
class CrashingContentPartitionIndexer(CrashingIndexerMixin, ContentPartitionIndexer):
pass
class CrashingRevisionIndexer(CrashingIndexerMixin, RevisionIndexer):
pass
class CrashingOriginIndexer(CrashingIndexerMixin, OriginIndexer):
pass
+class TrivialContentPartitionIndexer(ContentPartitionIndexer[str]):
+ USE_TOOLS = False
+
+ def index(self, id: bytes, data: Optional[bytes], **kwargs) -> List[str]:
+ return ["indexed " + id.decode()]
+
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int
+ ) -> Iterable[Sha1]:
+ return iter([b"excluded hash", b"other excluded hash"])
+
+ def persist_index_computations(self, results: List[str]) -> Dict[str, int]:
+ self._results.append(results) # type: ignore
+ return {"nb_added": len(results)}
+
+
def test_content_indexer_catch_exceptions():
indexer = CrashingContentIndexer(config=BASE_TEST_CONFIG)
indexer.objstorage = Mock()
indexer.objstorage.get.return_value = b"content"
assert indexer.run([b"foo"]) == {"status": "failed"}
indexer.catch_exceptions = False
with pytest.raises(_TestException):
indexer.run([b"foo"])
def test_revision_indexer_catch_exceptions():
indexer = CrashingRevisionIndexer(config=BASE_TEST_CONFIG)
indexer.storage = Mock()
indexer.storage.revision_get.return_value = ["rev"]
assert indexer.run([b"foo"]) == {"status": "failed"}
indexer.catch_exceptions = False
with pytest.raises(_TestException):
indexer.run([b"foo"])
def test_origin_indexer_catch_exceptions():
indexer = CrashingOriginIndexer(config=BASE_TEST_CONFIG)
assert indexer.run(["http://example.org"]) == {"status": "failed"}
indexer.catch_exceptions = False
with pytest.raises(_TestException):
indexer.run(["http://example.org"])
def test_content_partition_indexer_catch_exceptions():
indexer = CrashingContentPartitionIndexer(
config={**BASE_TEST_CONFIG, "write_batch_size": 42}
)
assert indexer.run(0, 42) == {"status": "failed"}
indexer.catch_exceptions = False
with pytest.raises(_TestException):
indexer.run(0, 42)
+
+
+def test_content_partition_indexer():
+ # TODO: simplify the mocking in this test
+ indexer = TrivialContentPartitionIndexer(
+ config={**BASE_TEST_CONFIG, "write_batch_size": 10,} # doesn't matter
+ )
+ indexer.catch_exceptions = False
+ indexer._results = []
+ indexer.storage = Mock()
+ indexer.storage.content_get_partition = lambda *args, **kwargs: PagedResult(
+ results=[
+ Content(sha1=c, sha1_git=c, sha256=c, blake2s256=c, length=42)
+ for c in [
+ b"hash1",
+ b"excluded hash",
+ b"hash2",
+ b"other excluded hash",
+ b"hash3",
+ ]
+ ],
+ next_page_token=None,
+ )
+ indexer.objstorage = Mock()
+ indexer.objstorage.get = lambda id: b"foo"
+ nb_partitions = 1
+ partition_id = 0
+ indexer.run(partition_id, nb_partitions)
+ assert indexer._results == [["indexed hash1", "indexed hash2", "indexed hash3"]]
diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py
index 21e5e0b..b3e4140 100644
--- a/swh/indexer/tests/test_journal_client.py
+++ b/swh/indexer/tests/test_journal_client.py
@@ -1,132 +1,116 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Dict, List
+from unittest.mock import patch
-from unittest.mock import Mock, patch
+import pytest
from swh.indexer.journal_client import process_journal_objects
+from swh.scheduler.interface import SchedulerInterface
-def test_one_origin_visit_status():
- mock_scheduler = Mock()
- messages = {
- "origin_visit_status": [{"status": "full", "origin": "file:///dev/zero",},]
- }
- process_journal_objects(
- messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"},
- )
- assert mock_scheduler.create_tasks.called is True
- call_args = mock_scheduler.create_tasks.call_args
- (args, kwargs) = call_args
- assert kwargs == {}
- del args[0][0]["next_run"]
- assert args == (
- [
- {
- "arguments": {"kwargs": {}, "args": (["file:///dev/zero"],),},
- "policy": "oneshot",
- "type": "task-name",
- "retries_left": 1,
- },
- ],
- )
+def search_tasks(indexer_scheduler: SchedulerInterface, task_type) -> List[Dict]:
+ tasks = indexer_scheduler.search_tasks(task_type=task_type)
+ keys_not_to_compare = ["next_run", "current_interval", "id", "priority", "status"]
-def test_origin_visit_legacy():
- mock_scheduler = Mock()
- messages = {
- "origin_visit_status": [
- {"status": "full", "origin": {"url": "file:///dev/zero",}},
- ]
- }
+ result_tasks = []
+ for task in tasks:
+ task = dict(task)
+
+ for key in keys_not_to_compare:
+ del task[key]
+
+ result_tasks.append(task)
+
+ return result_tasks
+
+
+@pytest.mark.parametrize(
+ "origin",
+ [
+ "file:///dev/zero", # current format
+ {"url": "file:///dev/zero",}, # legacy format
+ ],
+)
+def test_journal_client_origin_visit_status(origin, indexer_scheduler):
+ messages = {"origin_visit_status": [{"status": "full", "origin": origin},]}
process_journal_objects(
- messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"},
- )
- assert mock_scheduler.create_tasks.called is True
- call_args = mock_scheduler.create_tasks.call_args
- (args, kwargs) = call_args
- assert kwargs == {}
- del args[0][0]["next_run"]
- assert args == (
- [
- {
- "arguments": {"kwargs": {}, "args": (["file:///dev/zero"],),},
- "policy": "oneshot",
- "type": "task-name",
- "retries_left": 1,
- },
- ],
+ messages,
+ scheduler=indexer_scheduler,
+ task_names={"origin_metadata": "index-origin-metadata"},
)
+ actual_tasks = search_tasks(indexer_scheduler, task_type="index-origin-metadata")
+ assert actual_tasks == [
+ {
+ "arguments": {"kwargs": {}, "args": [["file:///dev/zero"]],},
+ "policy": "oneshot",
+ "type": "index-origin-metadata",
+ "retries_left": 1,
+ }
+ ]
-def test_one_origin_visit_batch():
- mock_scheduler = Mock()
+
+def test_journal_client_one_origin_visit_batch(indexer_scheduler):
messages = {
"origin_visit_status": [
{"status": "full", "origin": "file:///dev/zero",},
{"status": "full", "origin": "file:///tmp/foobar",},
]
}
process_journal_objects(
- messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"},
+ messages,
+ scheduler=indexer_scheduler,
+ task_names={"origin_metadata": "index-origin-metadata"},
)
- assert mock_scheduler.create_tasks.called is True
- call_args = mock_scheduler.create_tasks.call_args
- (args, kwargs) = call_args
- assert kwargs == {}
- del args[0][0]["next_run"]
- assert args == (
- [
- {
- "arguments": {
- "kwargs": {},
- "args": (["file:///dev/zero", "file:///tmp/foobar"],),
- },
- "policy": "oneshot",
- "type": "task-name",
- "retries_left": 1,
+
+ actual_tasks = search_tasks(indexer_scheduler, task_type="index-origin-metadata")
+ assert actual_tasks == [
+ {
+ "arguments": {
+ "kwargs": {},
+ "args": [["file:///dev/zero", "file:///tmp/foobar"]],
},
- ],
- )
+ "policy": "oneshot",
+ "type": "index-origin-metadata",
+ "retries_left": 1,
+ }
+ ]
@patch("swh.indexer.journal_client.MAX_ORIGINS_PER_TASK", 2)
-def test_origin_visit_batches():
- mock_scheduler = Mock()
+def test_journal_client_origin_visit_batches(indexer_scheduler):
messages = {
"origin_visit_status": [
{"status": "full", "origin": "file:///dev/zero",},
{"status": "full", "origin": "file:///tmp/foobar",},
{"status": "full", "origin": "file:///tmp/spamegg",},
]
}
process_journal_objects(
- messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"},
+ messages,
+ scheduler=indexer_scheduler,
+ task_names={"origin_metadata": "index-origin-metadata"},
)
- assert mock_scheduler.create_tasks.called is True
- call_args = mock_scheduler.create_tasks.call_args
- (args, kwargs) = call_args
- assert kwargs == {}
- del args[0][0]["next_run"]
- del args[0][1]["next_run"]
- assert args == (
- [
- {
- "arguments": {
- "kwargs": {},
- "args": (["file:///dev/zero", "file:///tmp/foobar"],),
- },
- "policy": "oneshot",
- "type": "task-name",
- "retries_left": 1,
+ actual_tasks = search_tasks(indexer_scheduler, task_type="index-origin-metadata")
+ assert actual_tasks == [
+ {
+ "arguments": {
+ "kwargs": {},
+ "args": [["file:///dev/zero", "file:///tmp/foobar"],],
},
- {
- "arguments": {"kwargs": {}, "args": (["file:///tmp/spamegg"],),},
- "policy": "oneshot",
- "type": "task-name",
- "retries_left": 1,
- },
- ],
- )
+ "policy": "oneshot",
+ "type": "index-origin-metadata",
+ "retries_left": 1,
+ },
+ {
+ "arguments": {"kwargs": {}, "args": [["file:///tmp/spamegg"]],},
+ "policy": "oneshot",
+ "type": "index-origin-metadata",
+ "retries_left": 1,
+ },
+ ]
diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py
index 3ad457c..67b5b05 100644
--- a/swh/indexer/tests/test_origin_head.py
+++ b/swh/indexer/tests/test_origin_head.py
@@ -1,190 +1,176 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
from datetime import datetime, timezone
import unittest
import pytest
from swh.indexer.origin_head import OriginHeadIndexer
from swh.indexer.tests.utils import fill_storage
from swh.model.model import (
Origin,
OriginVisit,
OriginVisitStatus,
Snapshot,
SnapshotBranch,
TargetType,
)
from swh.storage.utils import now
@pytest.fixture
def swh_indexer_config(swh_indexer_config):
config = copy.deepcopy(swh_indexer_config)
config.update(
{
"tools": {
"name": "origin-metadata",
"version": "0.0.1",
"configuration": {},
},
"tasks": {
"revision_intrinsic_metadata": None,
"origin_intrinsic_metadata": None,
},
}
)
return config
class OriginHeadTestIndexer(OriginHeadIndexer):
"""Specific indexer whose configuration is enough to satisfy the
indexing tests.
"""
def persist_index_computations(self, results):
self.results = results
+SAMPLE_SNAPSHOT = Snapshot(
+ branches={
+ b"foo": None,
+ b"HEAD": SnapshotBranch(target_type=TargetType.ALIAS, target=b"foo",),
+ },
+)
+
+
class OriginHead(unittest.TestCase):
@pytest.fixture(autouse=True)
def init(self, swh_config):
super().setUp()
self.indexer = OriginHeadTestIndexer()
self.indexer.catch_exceptions = False
fill_storage(self.indexer.storage)
def test_git(self):
origin_url = "https://github.com/SoftwareHeritage/swh-storage"
self.indexer.run([origin_url])
rev_id = b"8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}\xac\xefrm"
self.assertEqual(
self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}],
)
def test_git_partial_snapshot(self):
"""Checks partial snapshots are ignored."""
origin_url = "https://github.com/SoftwareHeritage/swh-core"
self.indexer.storage.origin_add([Origin(url=origin_url)])
visit = self.indexer.storage.origin_visit_add(
[
OriginVisit(
origin=origin_url,
date=datetime(2019, 2, 27, tzinfo=timezone.utc),
type="git",
)
]
)[0]
- self.indexer.storage.snapshot_add(
- [
- Snapshot(
- branches={
- b"foo": None,
- b"HEAD": SnapshotBranch(
- target_type=TargetType.ALIAS, target=b"foo",
- ),
- },
- ),
- ]
- )
+ self.indexer.storage.snapshot_add([SAMPLE_SNAPSHOT])
visit_status = OriginVisitStatus(
origin=origin_url,
visit=visit.visit,
date=now(),
status="partial",
- snapshot=b"foo",
+ snapshot=SAMPLE_SNAPSHOT.id,
)
self.indexer.storage.origin_visit_status_add([visit_status])
self.indexer.run([origin_url])
self.assertEqual(self.indexer.results, [])
def test_vcs_missing_snapshot(self):
origin_url = "https://github.com/SoftwareHeritage/swh-indexer"
self.indexer.storage.origin_add([Origin(url=origin_url)])
self.indexer.run([origin_url])
self.assertEqual(self.indexer.results, [])
def test_pypi_missing_branch(self):
origin_url = "https://pypi.org/project/abcdef/"
self.indexer.storage.origin_add([Origin(url=origin_url,)])
visit = self.indexer.storage.origin_visit_add(
[
OriginVisit(
origin=origin_url,
date=datetime(2019, 2, 27, tzinfo=timezone.utc),
type="pypi",
)
]
)[0]
- self.indexer.storage.snapshot_add(
- [
- Snapshot(
- branches={
- b"foo": None,
- b"HEAD": SnapshotBranch(
- target_type=TargetType.ALIAS, target=b"foo",
- ),
- },
- )
- ]
- )
+ self.indexer.storage.snapshot_add([SAMPLE_SNAPSHOT])
visit_status = OriginVisitStatus(
origin=origin_url,
visit=visit.visit,
date=now(),
status="full",
- snapshot=b"foo",
+ snapshot=SAMPLE_SNAPSHOT.id,
)
self.indexer.storage.origin_visit_status_add([visit_status])
self.indexer.run(["https://pypi.org/project/abcdef/"])
self.assertEqual(self.indexer.results, [])
def test_ftp(self):
origin_url = "rsync://ftp.gnu.org/gnu/3dldf"
self.indexer.run([origin_url])
rev_id = b"\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee\xcc\x1a\xb4`\x8c\x8by"
self.assertEqual(
self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}],
)
def test_ftp_missing_snapshot(self):
origin_url = "rsync://ftp.gnu.org/gnu/foobar"
self.indexer.storage.origin_add([Origin(url=origin_url)])
self.indexer.run([origin_url])
self.assertEqual(self.indexer.results, [])
def test_deposit(self):
origin_url = "https://forge.softwareheritage.org/source/jesuisgpl/"
self.indexer.storage.origin_add([Origin(url=origin_url)])
self.indexer.run([origin_url])
rev_id = b"\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{\xa6\xe9\x99\xb1\x9e]q\xeb"
self.assertEqual(
self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}],
)
def test_deposit_missing_snapshot(self):
origin_url = "https://forge.softwareheritage.org/source/foobar"
self.indexer.storage.origin_add([Origin(url=origin_url,)])
self.indexer.run([origin_url])
self.assertEqual(self.indexer.results, [])
def test_pypi(self):
origin_url = "https://pypi.org/project/limnoria/"
self.indexer.run([origin_url])
rev_id = b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8kA\x10\x9d\xc5\xfa2\xf8t"
self.assertEqual(
self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url}],
)
def test_svn(self):
origin_url = "http://0-512-md.googlecode.com/svn/"
self.indexer.run([origin_url])
rev_id = b"\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8\xc9\xad#.\x1bw=\x18"
self.assertEqual(
self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}],
)
diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py
index 63ce2aa..a555e9c 100644
--- a/swh/indexer/tests/test_origin_metadata.py
+++ b/swh/indexer/tests/test_origin_metadata.py
@@ -1,220 +1,255 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import copy
from unittest.mock import patch
+import pytest
+
from swh.indexer.metadata import OriginMetadataIndexer
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.indexer.storage.model import (
OriginIntrinsicMetadataRow,
RevisionIntrinsicMetadataRow,
)
from swh.model.model import Origin
from swh.storage.interface import StorageInterface
-from .test_metadata import REVISION_METADATA_CONFIG
+from .test_metadata import TRANSLATOR_TOOL
from .utils import REVISION, YARN_PARSER_METADATA
+@pytest.fixture
+def swh_indexer_config(swh_indexer_config):
+ """Override the default configuration to override the tools entry
+
+ """
+ cfg = copy.deepcopy(swh_indexer_config)
+ cfg["tools"] = TRANSLATOR_TOOL
+ return cfg
+
+
def test_origin_metadata_indexer(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
-
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
origin = "https://github.com/librariesio/yarn-parser"
indexer.run([origin])
- tool = {
- "name": "swh-metadata-translator",
- "version": "0.0.2",
- "configuration": {"context": "NpmMapping", "type": "local"},
- }
+ tool = swh_indexer_config["tools"]
rev_id = REVISION.id
rev_metadata = RevisionIntrinsicMetadataRow(
id=rev_id, tool=tool, metadata=YARN_PARSER_METADATA, mappings=["npm"],
)
origin_metadata = OriginIntrinsicMetadataRow(
id=origin,
tool=tool,
from_revision=rev_id,
metadata=YARN_PARSER_METADATA,
mappings=["npm"],
)
- rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
+ rev_results = list(idx_storage.revision_intrinsic_metadata_get([rev_id]))
for rev_result in rev_results:
assert rev_result.tool
del rev_result.tool["id"]
assert rev_results == [rev_metadata]
- orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin]))
+ orig_results = list(idx_storage.origin_intrinsic_metadata_get([origin]))
for orig_result in orig_results:
assert orig_result.tool
del orig_result.tool["id"]
assert orig_results == [origin_metadata]
def test_origin_metadata_indexer_duplicate_origin(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
indexer.storage = storage
indexer.idx_storage = idx_storage
indexer.run(["https://github.com/librariesio/yarn-parser"])
indexer.run(["https://github.com/librariesio/yarn-parser"] * 2)
origin = "https://github.com/librariesio/yarn-parser"
rev_id = REVISION.id
rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
assert len(rev_results) == 1
orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin]))
assert len(orig_results) == 1
def test_origin_metadata_indexer_missing_head(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
storage.origin_add([Origin(url="https://example.com")])
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
indexer.run(["https://example.com"])
origin = "https://example.com"
results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin]))
assert results == []
def test_origin_metadata_indexer_partial_missing_head(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
origin1 = "https://example.com"
origin2 = "https://github.com/librariesio/yarn-parser"
storage.origin_add([Origin(url=origin1)])
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
indexer.run([origin1, origin2])
rev_id = REVISION.id
rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
assert rev_results == [
RevisionIntrinsicMetadataRow(
id=rev_id,
metadata=YARN_PARSER_METADATA,
mappings=["npm"],
tool=rev_results[0].tool,
)
]
orig_results = list(
indexer.idx_storage.origin_intrinsic_metadata_get([origin1, origin2])
)
for orig_result in orig_results:
assert orig_results == [
OriginIntrinsicMetadataRow(
id=origin2,
from_revision=rev_id,
metadata=YARN_PARSER_METADATA,
mappings=["npm"],
tool=orig_results[0].tool,
)
]
def test_origin_metadata_indexer_duplicate_revision(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
indexer.storage = storage
indexer.idx_storage = idx_storage
indexer.catch_exceptions = False
origin1 = "https://github.com/librariesio/yarn-parser"
origin2 = "https://github.com/librariesio/yarn-parser.git"
indexer.run([origin1, origin2])
rev_id = REVISION.id
rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
assert len(rev_results) == 1
orig_results = list(
indexer.idx_storage.origin_intrinsic_metadata_get([origin1, origin2])
)
assert len(orig_results) == 2
def test_origin_metadata_indexer_no_metadata_file(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
origin = "https://github.com/librariesio/yarn-parser"
with patch("swh.indexer.metadata_dictionary.npm.NpmMapping.filename", b"foo.json"):
indexer.run([origin])
rev_id = REVISION.id
rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
assert rev_results == []
orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin]))
assert orig_results == []
def test_origin_metadata_indexer_no_metadata(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
origin = "https://github.com/librariesio/yarn-parser"
with patch(
"swh.indexer.metadata.RevisionMetadataIndexer"
".translate_revision_intrinsic_metadata",
return_value=(["npm"], {"@context": "foo"}),
):
indexer.run([origin])
rev_id = REVISION.id
rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
assert rev_results == []
orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin]))
assert orig_results == []
def test_origin_metadata_indexer_error(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
origin = "https://github.com/librariesio/yarn-parser"
with patch(
"swh.indexer.metadata.RevisionMetadataIndexer"
".translate_revision_intrinsic_metadata",
return_value=None,
):
indexer.run([origin])
rev_id = REVISION.id
rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id]))
assert rev_results == []
orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin]))
assert orig_results == []
def test_origin_metadata_indexer_unknown_origin(
- idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage
+ swh_indexer_config,
+ idx_storage: IndexerStorageInterface,
+ storage: StorageInterface,
+ obj_storage,
) -> None:
- indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG)
+ indexer = OriginMetadataIndexer(config=swh_indexer_config)
result = indexer.index_list(["https://unknown.org/foo"])
assert not result
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jun 3, 7:48 AM (12 h, 25 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3393756
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment