diff --git a/requirements-swh.txt b/requirements-swh.txt index fd8a344..52654a7 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,6 @@ -swh.core[db,http] >= 0.14.0 +swh.core[db,http] >= 2.9 swh.model >= 0.0.15 swh.objstorage >= 0.2.2 swh.scheduler >= 0.5.2 swh.storage >= 0.22.0 swh.journal >= 0.1.0 diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py index 1dc9fc2..470cf79 100644 --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -1,757 +1,755 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from importlib import import_module import json from typing import Dict, Iterable, List, Optional, Tuple, Union import warnings import psycopg2 import psycopg2.pool from swh.core.db.common import db_transaction from swh.indexer.storage.interface import IndexerStorageInterface from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import SHA1_SIZE from swh.storage.exc import StorageDBError from swh.storage.utils import get_partition_bounds_bytes from . import converters from .db import Db from .exc import DuplicateId, IndexerStorageArgumentException from .interface import PagedResult, Sha1 from .metrics import process_metrics, send_metric, timed from .model import ( ContentCtagsRow, ContentLanguageRow, ContentLicenseRow, ContentMetadataRow, ContentMimetypeRow, DirectoryIntrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from .writer import JournalWriter INDEXER_CFG_KEY = "indexer_storage" MAPPING_NAMES = ["cff", "codemeta", "gemspec", "maven", "npm", "pkg-info"] SERVER_IMPLEMENTATIONS: Dict[str, str] = { "postgresql": ".IndexerStorage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.IndexerStorage", # deprecated "local": ".IndexerStorage", } def get_indexer_storage(cls: str, **kwargs) -> IndexerStorageInterface: """Instantiate an indexer storage implementation of class `cls` with arguments `kwargs`. Args: cls: indexer storage class (local, remote or memory) kwargs: dictionary of arguments passed to the indexer storage class constructor Returns: an instance of swh.indexer.storage Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] class_path = SERVER_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( f"Unknown indexer storage class `{cls}`. " f"Supported: {', '.join(SERVER_IMPLEMENTATIONS)}" ) (module_path, class_name) = class_path.rsplit(".", 1) module = import_module(module_path if module_path else ".", package=__package__) BackendClass = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) idx_storage = BackendClass(**kwargs) if check_config: if not idx_storage.check_config(**check_config): raise EnvironmentError("Indexer storage check config failed") return idx_storage def check_id_duplicates(data): """ If any two row models in `data` have the same unique key, raises a `ValueError`. Values associated to the key must be hashable. Args: data (List[dict]): List of dictionaries to be inserted >>> check_id_duplicates([ ... ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"), ... ContentLanguageRow(id=b'foo', indexer_configuration_id=32, lang="python"), ... ]) >>> check_id_duplicates([ ... ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"), ... ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"), ... ]) Traceback (most recent call last): ... swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'indexer_configuration_id': 42}] """ # noqa counter = Counter(tuple(sorted(item.unique_key().items())) for item in data) duplicates = [id_ for (id_, count) in counter.items() if count >= 2] if duplicates: raise DuplicateId(list(map(dict, duplicates))) class IndexerStorage: - """SWH Indexer Storage""" + """SWH Indexer Storage Datastore""" + + current_version = 134 def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): """ Args: db: either a libpq connection string, or a psycopg2 connection journal_writer: configuration passed to `swh.journal.writer.get_journal_writer` """ self.journal_writer = JournalWriter(self._tool_get_from_id, journal_writer) try: if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = Db(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db ) self._db = None except psycopg2.OperationalError as e: raise StorageDBError(e) def get_db(self): if self._db: return self._db return Db.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() - @db_transaction() - def get_current_version(self, *, db=None, cur=None): - return db.current_version - @timed @db_transaction() def check_config(self, *, check_write, db=None, cur=None): # Check permissions on one of the tables if check_write: check = "INSERT" else: check = "SELECT" cur.execute( "select has_table_privilege(current_user, 'content_mimetype', %s)", # noqa (check,), ) return cur.fetchone()[0] @timed @db_transaction() def content_mimetype_missing( self, mimetypes: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [obj[0] for obj in db.content_mimetype_missing_from_list(mimetypes, cur)] @timed @db_transaction() def get_partition( self, indexer_type: str, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, with_textual_data=False, db=None, cur=None, ) -> PagedResult[Sha1]: """Retrieve ids of content with `indexer_type` within within partition partition_id bound by limit. Args: **indexer_type**: Type of data content to index (mimetype, language, etc...) **indexer_configuration_id**: The tool used to index data **partition_id**: index of the partition to fetch **nb_partitions**: total number of partitions to split into **page_token**: opaque token used for pagination **limit**: Limit result (default to 1000) **with_textual_data** (bool): Deal with only textual content (True) or all content (all contents by defaults, False) Raises: IndexerStorageArgumentException for; - limit to None - wrong indexer_type provided Returns: PagedResult of Sha1. If next_page_token is None, there is no more data to fetch """ if limit is None: raise IndexerStorageArgumentException("limit should not be None") if indexer_type not in db.content_indexer_names: err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]" raise IndexerStorageArgumentException(err) start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE) if page_token is not None: start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE next_page_token: Optional[str] = None ids = [ row[0] for row in db.content_get_range( indexer_type, start, end, indexer_configuration_id, limit=limit + 1, with_textual_data=with_textual_data, cur=cur, ) ] if len(ids) >= limit: next_page_token = hash_to_hex(ids[-1]) ids = ids[:limit] assert len(ids) <= limit return PagedResult(results=ids, next_page_token=next_page_token) @timed @db_transaction() def content_mimetype_get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, db=None, cur=None, ) -> PagedResult[Sha1]: return self.get_partition( "mimetype", indexer_configuration_id, partition_id, nb_partitions, page_token=page_token, limit=limit, db=db, cur=cur, ) @timed @process_metrics @db_transaction() def content_mimetype_add( self, mimetypes: List[ContentMimetypeRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(mimetypes) mimetypes.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_mimetype", mimetypes) db.mktemp_content_mimetype(cur) db.copy_to( [m.to_dict() for m in mimetypes], "tmp_content_mimetype", ["id", "mimetype", "encoding", "indexer_configuration_id"], cur, ) count = db.content_mimetype_add_from_temp(cur) return {"content_mimetype:add": count} @timed @db_transaction() def content_mimetype_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentMimetypeRow]: return [ ContentMimetypeRow.from_dict( converters.db_to_mimetype(dict(zip(db.content_mimetype_cols, c))) ) for c in db.content_mimetype_get_from_list(ids, cur) ] @timed @db_transaction() def content_language_missing( self, languages: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [obj[0] for obj in db.content_language_missing_from_list(languages, cur)] @timed @db_transaction() def content_language_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentLanguageRow]: return [ ContentLanguageRow.from_dict( converters.db_to_language(dict(zip(db.content_language_cols, c))) ) for c in db.content_language_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def content_language_add( self, languages: List[ContentLanguageRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(languages) languages.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_language", languages) db.mktemp_content_language(cur) # empty language is mapped to 'unknown' db.copy_to( ( { "id": lang.id, "lang": lang.lang or "unknown", "indexer_configuration_id": lang.indexer_configuration_id, } for lang in languages ), "tmp_content_language", ["id", "lang", "indexer_configuration_id"], cur, ) count = db.content_language_add_from_temp(cur) return {"content_language:add": count} @timed @db_transaction() def content_ctags_missing( self, ctags: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [obj[0] for obj in db.content_ctags_missing_from_list(ctags, cur)] @timed @db_transaction() def content_ctags_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentCtagsRow]: return [ ContentCtagsRow.from_dict( converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) ) for c in db.content_ctags_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def content_ctags_add( self, ctags: List[ContentCtagsRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(ctags) ctags.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_ctags", ctags) db.mktemp_content_ctags(cur) db.copy_to( [ctag.to_dict() for ctag in ctags], tblname="tmp_content_ctags", columns=["id", "name", "kind", "line", "lang", "indexer_configuration_id"], cur=cur, ) count = db.content_ctags_add_from_temp(cur) return {"content_ctags:add": count} @timed @db_transaction() def content_ctags_search( self, expression: str, limit: int = 10, last_sha1: Optional[Sha1] = None, db=None, cur=None, ) -> List[ContentCtagsRow]: return [ ContentCtagsRow.from_dict( converters.db_to_ctags(dict(zip(db.content_ctags_cols, obj))) ) for obj in db.content_ctags_search(expression, last_sha1, limit, cur=cur) ] @timed @db_transaction() def content_fossology_license_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentLicenseRow]: return [ ContentLicenseRow.from_dict( converters.db_to_fossology_license( dict(zip(db.content_fossology_license_cols, c)) ) ) for c in db.content_fossology_license_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def content_fossology_license_add( self, licenses: List[ContentLicenseRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(licenses) licenses.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_fossology_license", licenses) db.mktemp_content_fossology_license(cur) db.copy_to( [license.to_dict() for license in licenses], tblname="tmp_content_fossology_license", columns=["id", "license", "indexer_configuration_id"], cur=cur, ) count = db.content_fossology_license_add_from_temp(cur) return {"content_fossology_license:add": count} @timed @db_transaction() def content_fossology_license_get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, db=None, cur=None, ) -> PagedResult[Sha1]: return self.get_partition( "fossology_license", indexer_configuration_id, partition_id, nb_partitions, page_token=page_token, limit=limit, with_textual_data=True, db=db, cur=cur, ) @timed @db_transaction() def content_metadata_missing( self, metadata: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [obj[0] for obj in db.content_metadata_missing_from_list(metadata, cur)] @timed @db_transaction() def content_metadata_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentMetadataRow]: return [ ContentMetadataRow.from_dict( converters.db_to_metadata(dict(zip(db.content_metadata_cols, c))) ) for c in db.content_metadata_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def content_metadata_add( self, metadata: List[ContentMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_metadata", metadata) db.mktemp_content_metadata(cur) db.copy_to( [m.to_dict() for m in metadata], "tmp_content_metadata", ["id", "metadata", "indexer_configuration_id"], cur, ) count = db.content_metadata_add_from_temp(cur) return { "content_metadata:add": count, } @timed @db_transaction() def directory_intrinsic_metadata_missing( self, metadata: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [ obj[0] for obj in db.directory_intrinsic_metadata_missing_from_list(metadata, cur) ] @timed @db_transaction() def directory_intrinsic_metadata_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[DirectoryIntrinsicMetadataRow]: return [ DirectoryIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.directory_intrinsic_metadata_cols, c)) ) ) for c in db.directory_intrinsic_metadata_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def directory_intrinsic_metadata_add( self, metadata: List[DirectoryIntrinsicMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("directory_intrinsic_metadata", metadata) db.mktemp_directory_intrinsic_metadata(cur) db.copy_to( [m.to_dict() for m in metadata], "tmp_directory_intrinsic_metadata", ["id", "metadata", "mappings", "indexer_configuration_id"], cur, ) count = db.directory_intrinsic_metadata_add_from_temp(cur) return { "directory_intrinsic_metadata:add": count, } @timed @db_transaction() def origin_intrinsic_metadata_get( self, urls: Iterable[str], db=None, cur=None ) -> List[OriginIntrinsicMetadataRow]: return [ OriginIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c)) ) ) for c in db.origin_intrinsic_metadata_get_from_list(urls, cur) ] @timed @process_metrics @db_transaction() def origin_intrinsic_metadata_add( self, metadata: List[OriginIntrinsicMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("origin_intrinsic_metadata", metadata) db.mktemp_origin_intrinsic_metadata(cur) db.copy_to( [m.to_dict() for m in metadata], "tmp_origin_intrinsic_metadata", [ "id", "metadata", "indexer_configuration_id", "from_directory", "mappings", ], cur, ) count = db.origin_intrinsic_metadata_add_from_temp(cur) return { "origin_intrinsic_metadata:add": count, } @timed @db_transaction() def origin_intrinsic_metadata_search_fulltext( self, conjunction: List[str], limit: int = 100, db=None, cur=None ) -> List[OriginIntrinsicMetadataRow]: return [ OriginIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c)) ) ) for c in db.origin_intrinsic_metadata_search_fulltext( conjunction, limit=limit, cur=cur ) ] @timed @db_transaction() def origin_intrinsic_metadata_search_by_producer( self, page_token: str = "", limit: int = 100, ids_only: bool = False, mappings: Optional[List[str]] = None, tool_ids: Optional[List[int]] = None, db=None, cur=None, ) -> PagedResult[Union[str, OriginIntrinsicMetadataRow]]: assert isinstance(page_token, str) # we go to limit+1 to check whether we should add next_page_token in # the response rows = db.origin_intrinsic_metadata_search_by_producer( page_token, limit + 1, ids_only, mappings, tool_ids, cur ) next_page_token = None if ids_only: results = [origin for (origin,) in rows] if len(results) > limit: results[limit:] = [] next_page_token = results[-1] else: results = [ OriginIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, row)) ) ) for row in rows ] if len(results) > limit: results[limit:] = [] next_page_token = results[-1].id return PagedResult( results=results, next_page_token=next_page_token, ) @timed @db_transaction() def origin_intrinsic_metadata_stats(self, db=None, cur=None): mapping_names = [m for m in MAPPING_NAMES] select_parts = [] # Count rows for each mapping for mapping_name in mapping_names: select_parts.append( ( "sum(case when (mappings @> ARRAY['%s']) " " then 1 else 0 end)" ) % mapping_name ) # Total select_parts.append("sum(1)") # Rows whose metadata has at least one key that is not '@context' select_parts.append( "sum(case when ('{}'::jsonb @> (metadata - '@context')) " " then 0 else 1 end)" ) cur.execute( "select " + ", ".join(select_parts) + " from origin_intrinsic_metadata" ) results = dict(zip(mapping_names + ["total", "non_empty"], cur.fetchone())) return { "total": results.pop("total"), "non_empty": results.pop("non_empty"), "per_mapping": results, } @timed @db_transaction() def indexer_configuration_add(self, tools, db=None, cur=None): db.mktemp_indexer_configuration(cur) db.copy_to( tools, "tmp_indexer_configuration", ["tool_name", "tool_version", "tool_configuration"], cur, ) tools = db.indexer_configuration_add_from_temp(cur) results = [dict(zip(db.indexer_configuration_cols, line)) for line in tools] send_metric( "indexer_configuration:add", len(results), method_name="indexer_configuration_add", ) return results @timed @db_transaction() def indexer_configuration_get(self, tool, db=None, cur=None): tool_conf = tool["tool_configuration"] if isinstance(tool_conf, dict): tool_conf = json.dumps(tool_conf) idx = db.indexer_configuration_get( tool["tool_name"], tool["tool_version"], tool_conf ) if not idx: return None return dict(zip(db.indexer_configuration_cols, idx)) @db_transaction() def _tool_get_from_id(self, id_, db, cur): tool = dict( zip( db.indexer_configuration_cols, db.indexer_configuration_get_from_id(id_, cur), ) ) return { "id": tool["id"], "name": tool["tool_name"], "version": tool["tool_version"], "configuration": tool["tool_configuration"], } diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py index c8dbb12..cb0ae0a 100644 --- a/swh/indexer/storage/db.py +++ b/swh/indexer/storage/db.py @@ -1,535 +1,534 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Iterable, Iterator, List from swh.core.db import BaseDb from swh.core.db.db_utils import execute_values_generator, stored_procedure from swh.model import hashutil from .interface import Sha1 class Db(BaseDb): """Proxy to the SWH Indexer DB, with wrappers around stored procedures""" content_mimetype_hash_keys = ["id", "indexer_configuration_id"] - current_version = 134 def _missing_from_list( self, table: str, data: Iterable[Dict], hash_keys: List[str], cur=None ): """Read from table the data with hash_keys that are missing. Args: table: Table name (e.g content_mimetype, content_language, etc...) data: Dict of data to read from hash_keys: List of keys to read in the data dict. Yields: The data which is missing from the db. """ cur = self._cursor(cur) keys = ", ".join(hash_keys) equality = " AND ".join(("t.%s = c.%s" % (key, key)) for key in hash_keys) yield from execute_values_generator( cur, """ select %s from (values %%s) as t(%s) where not exists ( select 1 from %s c where %s ) """ % (keys, keys, table, equality), (tuple(m[k] for k in hash_keys) for m in data), ) def content_mimetype_missing_from_list( self, mimetypes: Iterable[Dict], cur=None ) -> Iterator[Sha1]: """List missing mimetypes.""" yield from self._missing_from_list( "content_mimetype", mimetypes, self.content_mimetype_hash_keys, cur=cur ) content_mimetype_cols = [ "id", "mimetype", "encoding", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_mimetype") def mktemp_content_mimetype(self, cur=None): pass def content_mimetype_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_mimetype_add()") return cur.fetchone()[0] def _convert_key(self, key, main_table="c"): """Convert keys according to specific use in the module. Args: key (str): Key expression to change according to the alias used in the query main_table (str): Alias to use for the main table. Default to c for content_{something}. Expected: Tables content_{something} being aliased as 'c' (something in {language, mimetype, ...}), table indexer_configuration being aliased as 'i'. """ if key == "id": return "%s.id" % main_table elif key == "tool_id": return "i.id as tool_id" elif key == "license": return ( """ ( select name from fossology_license where id = %s.license_id ) as licenses""" % main_table ) return key def _get_from_list(self, table, ids, cols, cur=None, id_col="id"): """Fetches entries from the `table` such that their `id` field (or whatever is given to `id_col`) is in `ids`. Returns the columns `cols`. The `cur` parameter is used to connect to the database. """ cur = self._cursor(cur) keys = map(self._convert_key, cols) query = """ select {keys} from (values %s) as t(id) inner join {table} c on c.{id_col}=t.id inner join indexer_configuration i on c.indexer_configuration_id=i.id; """.format( keys=", ".join(keys), id_col=id_col, table=table ) yield from execute_values_generator(cur, query, ((_id,) for _id in ids)) content_indexer_names = { "mimetype": "content_mimetype", "fossology_license": "content_fossology_license", } def content_get_range( self, content_type, start, end, indexer_configuration_id, limit=1000, with_textual_data=False, cur=None, ): """Retrieve contents with content_type, within range [start, end] bound by limit and associated to the given indexer configuration id. When asking to work on textual content, that filters on the mimetype table with any mimetype that is not binary. """ cur = self._cursor(cur) table = self.content_indexer_names[content_type] if with_textual_data: extra = """inner join content_mimetype cm on (t.id=cm.id and cm.mimetype like 'text/%%' and %(start)s <= cm.id and cm.id <= %(end)s) """ else: extra = "" query = f"""select t.id from {table} t {extra} where t.indexer_configuration_id=%(tool_id)s and %(start)s <= t.id and t.id <= %(end)s order by t.indexer_configuration_id, t.id limit %(limit)s""" cur.execute( query, { "start": start, "end": end, "tool_id": indexer_configuration_id, "limit": limit, }, ) yield from cur def content_mimetype_get_from_list(self, ids, cur=None): yield from self._get_from_list( "content_mimetype", ids, self.content_mimetype_cols, cur=cur ) content_language_hash_keys = ["id", "indexer_configuration_id"] def content_language_missing_from_list(self, languages, cur=None): """List missing languages.""" yield from self._missing_from_list( "content_language", languages, self.content_language_hash_keys, cur=cur ) content_language_cols = [ "id", "lang", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_language") def mktemp_content_language(self, cur=None): pass def content_language_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_language_add()") return cur.fetchone()[0] def content_language_get_from_list(self, ids, cur=None): yield from self._get_from_list( "content_language", ids, self.content_language_cols, cur=cur ) content_ctags_hash_keys = ["id", "indexer_configuration_id"] def content_ctags_missing_from_list(self, ctags, cur=None): """List missing ctags.""" yield from self._missing_from_list( "content_ctags", ctags, self.content_ctags_hash_keys, cur=cur ) content_ctags_cols = [ "id", "name", "kind", "line", "lang", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_ctags") def mktemp_content_ctags(self, cur=None): pass def content_ctags_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_ctags_add()") return cur.fetchone()[0] def content_ctags_get_from_list(self, ids, cur=None): cur = self._cursor(cur) keys = map(self._convert_key, self.content_ctags_cols) yield from execute_values_generator( cur, """ select %s from (values %%s) as t(id) inner join content_ctags c on c.id=t.id inner join indexer_configuration i on c.indexer_configuration_id=i.id order by line """ % ", ".join(keys), ((_id,) for _id in ids), ) def content_ctags_search(self, expression, last_sha1, limit, cur=None): cur = self._cursor(cur) if not last_sha1: query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s)""" % ( ",".join(self.content_ctags_cols) ) cur.execute(query, (expression, limit)) else: if last_sha1 and isinstance(last_sha1, bytes): last_sha1 = "\\x%s" % hashutil.hash_to_hex(last_sha1) elif last_sha1: last_sha1 = "\\x%s" % last_sha1 query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s, %%s)""" % ( ",".join(self.content_ctags_cols) ) cur.execute(query, (expression, limit, last_sha1)) yield from cur content_fossology_license_cols = [ "id", "tool_id", "tool_name", "tool_version", "tool_configuration", "license", ] @stored_procedure("swh_mktemp_content_fossology_license") def mktemp_content_fossology_license(self, cur=None): pass def content_fossology_license_add_from_temp(self, cur=None): """Add new licenses per content.""" cur = self._cursor(cur) cur.execute("select * from swh_content_fossology_license_add()") return cur.fetchone()[0] def content_fossology_license_get_from_list(self, ids, cur=None): """Retrieve licenses per id.""" cur = self._cursor(cur) keys = map(self._convert_key, self.content_fossology_license_cols) yield from execute_values_generator( cur, """ select %s from (values %%s) as t(id) inner join content_fossology_license c on t.id=c.id inner join indexer_configuration i on i.id=c.indexer_configuration_id """ % ", ".join(keys), ((_id,) for _id in ids), ) content_metadata_hash_keys = ["id", "indexer_configuration_id"] def content_metadata_missing_from_list(self, metadata, cur=None): """List missing metadata.""" yield from self._missing_from_list( "content_metadata", metadata, self.content_metadata_hash_keys, cur=cur ) content_metadata_cols = [ "id", "metadata", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_metadata") def mktemp_content_metadata(self, cur=None): pass def content_metadata_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_metadata_add()") return cur.fetchone()[0] def content_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( "content_metadata", ids, self.content_metadata_cols, cur=cur ) directory_intrinsic_metadata_hash_keys = ["id", "indexer_configuration_id"] def directory_intrinsic_metadata_missing_from_list(self, metadata, cur=None): """List missing metadata.""" yield from self._missing_from_list( "directory_intrinsic_metadata", metadata, self.directory_intrinsic_metadata_hash_keys, cur=cur, ) directory_intrinsic_metadata_cols = [ "id", "metadata", "mappings", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_directory_intrinsic_metadata") def mktemp_directory_intrinsic_metadata(self, cur=None): pass def directory_intrinsic_metadata_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_directory_intrinsic_metadata_add()") return cur.fetchone()[0] def directory_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( "directory_intrinsic_metadata", ids, self.directory_intrinsic_metadata_cols, cur=cur, ) origin_intrinsic_metadata_cols = [ "id", "metadata", "from_directory", "mappings", "tool_id", "tool_name", "tool_version", "tool_configuration", ] origin_intrinsic_metadata_regconfig = "pg_catalog.simple" """The dictionary used to normalize 'metadata' and queries. 'pg_catalog.simple' provides no stopword, so it should be suitable for proper names and non-English content. When updating this value, make sure to add a new index on origin_intrinsic_metadata.metadata.""" @stored_procedure("swh_mktemp_origin_intrinsic_metadata") def mktemp_origin_intrinsic_metadata(self, cur=None): pass def origin_intrinsic_metadata_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_origin_intrinsic_metadata_add()") return cur.fetchone()[0] def origin_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( "origin_intrinsic_metadata", ids, self.origin_intrinsic_metadata_cols, cur=cur, id_col="id", ) def origin_intrinsic_metadata_search_fulltext(self, terms, *, limit, cur): regconfig = self.origin_intrinsic_metadata_regconfig tsquery_template = " && ".join( "plainto_tsquery('%s', %%s)" % regconfig for _ in terms ) tsquery_args = [(term,) for term in terms] keys = ( self._convert_key(col, "oim") for col in self.origin_intrinsic_metadata_cols ) query = ( "SELECT {keys} FROM origin_intrinsic_metadata AS oim " "INNER JOIN indexer_configuration AS i " "ON oim.indexer_configuration_id=i.id " "JOIN LATERAL (SELECT {tsquery_template}) AS s(tsq) ON true " "WHERE oim.metadata_tsvector @@ tsq " "ORDER BY ts_rank(oim.metadata_tsvector, tsq, 1) DESC " "LIMIT %s;" ).format(keys=", ".join(keys), tsquery_template=tsquery_template) cur.execute(query, tsquery_args + [limit]) yield from cur def origin_intrinsic_metadata_search_by_producer( self, last, limit, ids_only, mappings, tool_ids, cur ): if ids_only: keys = "oim.id" else: keys = ", ".join( ( self._convert_key(col, "oim") for col in self.origin_intrinsic_metadata_cols ) ) query_parts = [ "SELECT %s" % keys, "FROM origin_intrinsic_metadata AS oim", "INNER JOIN indexer_configuration AS i", "ON oim.indexer_configuration_id=i.id", ] args = [] where = [] if last: where.append("oim.id > %s") args.append(last) if mappings is not None: where.append("oim.mappings && %s") args.append(list(mappings)) if tool_ids is not None: where.append("oim.indexer_configuration_id = ANY(%s)") args.append(list(tool_ids)) if where: query_parts.append("WHERE") query_parts.append(" AND ".join(where)) if limit: query_parts.append("LIMIT %s") args.append(limit) cur.execute(" ".join(query_parts), args) yield from cur indexer_configuration_cols = [ "id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_indexer_configuration") def mktemp_indexer_configuration(self, cur=None): pass def indexer_configuration_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute( "SELECT %s from swh_indexer_configuration_add()" % (",".join(self.indexer_configuration_cols),) ) yield from cur def indexer_configuration_get( self, tool_name, tool_version, tool_configuration, cur=None ): cur = self._cursor(cur) cur.execute( """select %s from indexer_configuration where tool_name=%%s and tool_version=%%s and tool_configuration=%%s""" % (",".join(self.indexer_configuration_cols)), (tool_name, tool_version, tool_configuration), ) return cur.fetchone() def indexer_configuration_get_from_id(self, id_, cur=None): cur = self._cursor(cur) cur.execute( """select %s from indexer_configuration where id=%%s""" % (",".join(self.indexer_configuration_cols)), (id_,), ) return cur.fetchone() diff --git a/swh/indexer/tests/conftest.py b/swh/indexer/tests/conftest.py index cef62d3..b9211ab 100644 --- a/swh/indexer/tests/conftest.py +++ b/swh/indexer/tests/conftest.py @@ -1,133 +1,132 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta from functools import partial import os from typing import List, Tuple from unittest.mock import patch import pytest from pytest_postgresql import factories import yaml from swh.core.db.pytest_plugin import initialize_database_for_module -from swh.indexer.storage import get_indexer_storage -from swh.indexer.storage.db import Db as IndexerDb +from swh.indexer.storage import IndexerStorage, get_indexer_storage from swh.objstorage.factory import get_objstorage from swh.storage import get_storage from .utils import fill_obj_storage, fill_storage TASK_NAMES: List[Tuple[str, str]] = [ # (scheduler-task-type, task-class-test-name) ("index-directory-metadata", "directory_intrinsic_metadata"), ("index-origin-metadata", "origin_intrinsic_metadata"), ] idx_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="indexer", - version=IndexerDb.current_version, + version=IndexerStorage.current_version, ) ], ) idx_storage_postgresql = factories.postgresql("idx_postgresql_proc") @pytest.fixture def indexer_scheduler(swh_scheduler): # Insert the expected task types within the scheduler for task_name, task_class_name in TASK_NAMES: swh_scheduler.create_task_type( { "type": task_name, "description": f"The {task_class_name} indexer testing task", "backend_name": f"swh.indexer.tests.tasks.{task_class_name}", "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), "num_retries": 3, } ) return swh_scheduler @pytest.fixture def idx_storage_backend_config(idx_storage_postgresql): """Basic pg storage configuration with no journal collaborator for the indexer storage (to avoid pulling optional dependency on clients of this fixture) """ return { "cls": "local", "db": idx_storage_postgresql.dsn, } @pytest.fixture def swh_indexer_config( swh_storage_backend_config, idx_storage_backend_config, swh_scheduler_config ): return { "storage": swh_storage_backend_config, "objstorage": {"cls": "memory"}, "indexer_storage": idx_storage_backend_config, "scheduler": {"cls": "local", **swh_scheduler_config}, "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, }, "compute_checksums": ["blake2b512"], # for rehash indexer } @pytest.fixture def idx_storage(swh_indexer_config): """An instance of in-memory indexer storage that gets injected into all indexers classes. """ idx_storage_config = swh_indexer_config["indexer_storage"] return get_indexer_storage(**idx_storage_config) @pytest.fixture def storage(swh_indexer_config): """An instance of in-memory storage that gets injected into all indexers classes. """ storage = get_storage(**swh_indexer_config["storage"]) fill_storage(storage) return storage @pytest.fixture def obj_storage(swh_indexer_config): """An instance of in-memory objstorage that gets injected into all indexers classes. """ objstorage = get_objstorage(**swh_indexer_config["objstorage"]) fill_obj_storage(objstorage) with patch.dict( "swh.objstorage.factory._STORAGE_CLASSES", {"memory": lambda: objstorage} ): yield objstorage @pytest.fixture def swh_config(swh_indexer_config, monkeypatch, tmp_path): conffile = os.path.join(str(tmp_path), "indexer.yml") with open(conffile, "w") as f: f.write(yaml.dump(swh_indexer_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) return conffile