diff --git a/swh/indexer/sql/50-func.sql b/swh/indexer/sql/50-func.sql index d459a4a..f138c07 100644 --- a/swh/indexer/sql/50-func.sql +++ b/swh/indexer/sql/50-func.sql @@ -1,487 +1,496 @@ -- Postgresql index helper function create or replace function hash_sha1(text) returns text language sql strict immutable as $$ select encode(public.digest($1, 'sha1'), 'hex') $$; comment on function hash_sha1(text) is 'Compute sha1 hash as text'; -- create a temporary table called tmp_TBLNAME, mimicking existing table -- TBLNAME -- -- Args: -- tblname: name of the table to mimic create or replace function swh_mktemp(tblname regclass) returns void language plpgsql as $$ begin execute format(' create temporary table if not exists tmp_%1$I (like %1$I including defaults) on commit delete rows; alter table tmp_%1$I drop column if exists object_id; ', tblname); return; end $$; -- create a temporary table for content_mimetype tmp_content_mimetype, create or replace function swh_mktemp_content_mimetype() returns void language sql as $$ create temporary table if not exists tmp_content_mimetype ( like content_mimetype including defaults ) on commit delete rows; $$; comment on function swh_mktemp_content_mimetype() IS 'Helper table to add mimetype information'; -- add tmp_content_mimetype entries to content_mimetype, overwriting duplicates. -- -- If filtering duplicates is in order, the call to -- swh_content_mimetype_missing must take place before calling this -- function. -- -- operates in bulk: 0. swh_mktemp(content_mimetype), 1. COPY to tmp_content_mimetype, -- 2. call this function create or replace function swh_content_mimetype_add() returns bigint language plpgsql as $$ declare res bigint; begin insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) select id, mimetype, encoding, indexer_configuration_id from tmp_content_mimetype tcm + order by id, indexer_configuration_id on conflict(id, indexer_configuration_id) do update set mimetype = excluded.mimetype, encoding = excluded.encoding; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_content_mimetype_add() IS 'Add new content mimetypes'; -- add tmp_content_language entries to content_language, overwriting duplicates. -- -- If filtering duplicates is in order, the call to -- swh_content_language_missing must take place before calling this -- function. -- -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_content_language, 2. call this function create or replace function swh_content_language_add() returns bigint language plpgsql as $$ declare res bigint; begin insert into content_language (id, lang, indexer_configuration_id) select id, lang, indexer_configuration_id from tmp_content_language tcl + order by id, indexer_configuration_id on conflict(id, indexer_configuration_id) do update set lang = excluded.lang; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_content_language_add() IS 'Add new content languages'; -- create a temporary table for retrieving content_language create or replace function swh_mktemp_content_language() returns void language sql as $$ create temporary table if not exists tmp_content_language ( like content_language including defaults ) on commit delete rows; $$; comment on function swh_mktemp_content_language() is 'Helper table to add content language'; -- create a temporary table for content_ctags tmp_content_ctags, create or replace function swh_mktemp_content_ctags() returns void language sql as $$ create temporary table if not exists tmp_content_ctags ( like content_ctags including defaults ) on commit delete rows; $$; comment on function swh_mktemp_content_ctags() is 'Helper table to add content ctags'; -- add tmp_content_ctags entries to content_ctags, overwriting duplicates -- -- operates in bulk: 0. swh_mktemp(content_ctags), 1. COPY to tmp_content_ctags, -- 2. call this function create or replace function swh_content_ctags_add() returns bigint language plpgsql as $$ declare res bigint; begin insert into content_ctags (id, name, kind, line, lang, indexer_configuration_id) select id, name, kind, line, lang, indexer_configuration_id from tmp_content_ctags tct + order by id, hash_sha1(name), kind, line, lang, indexer_configuration_id on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) do nothing; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_content_ctags_add() IS 'Add new ctags symbols per content'; create type content_ctags_signature as ( id sha1, name text, kind text, line bigint, lang ctags_languages, tool_id integer, tool_name text, tool_version text, tool_configuration jsonb ); -- Search within ctags content. -- create or replace function swh_content_ctags_search( expression text, l integer default 10, last_sha1 sha1 default '\x0000000000000000000000000000000000000000') returns setof content_ctags_signature language sql as $$ select c.id, name, kind, line, lang, i.id as tool_id, tool_name, tool_version, tool_configuration from content_ctags c inner join indexer_configuration i on i.id = c.indexer_configuration_id where hash_sha1(name) = hash_sha1(expression) and c.id > last_sha1 order by id limit l; $$; comment on function swh_content_ctags_search(text, integer, sha1) IS 'Equality search through ctags'' symbols'; -- create a temporary table for content_fossology_license tmp_content_fossology_license, create or replace function swh_mktemp_content_fossology_license() returns void language sql as $$ create temporary table if not exists tmp_content_fossology_license ( id sha1, license text, indexer_configuration_id integer ) on commit delete rows; $$; comment on function swh_mktemp_content_fossology_license() is 'Helper table to add content license'; -- add tmp_content_fossology_license entries to content_fossology_license, -- overwriting duplicates. -- -- operates in bulk: 0. swh_mktemp(content_fossology_license), 1. COPY to -- tmp_content_fossology_license, 2. call this function create or replace function swh_content_fossology_license_add() returns bigint language plpgsql as $$ declare res bigint; begin -- insert unknown licenses first insert into fossology_license (name) select distinct license from tmp_content_fossology_license tmp where not exists (select 1 from fossology_license where name=tmp.license) on conflict(name) do nothing; insert into content_fossology_license (id, license_id, indexer_configuration_id) select tcl.id, (select id from fossology_license where name = tcl.license) as license, indexer_configuration_id from tmp_content_fossology_license tcl + order by tcl.id, license, indexer_configuration_id on conflict(id, license_id, indexer_configuration_id) do update set license_id = excluded.license_id; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_content_fossology_license_add() IS 'Add new content licenses'; -- content_metadata functions -- add tmp_content_metadata entries to content_metadata, overwriting duplicates -- -- If filtering duplicates is in order, the call to -- swh_content_metadata_missing must take place before calling this -- function. -- -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_content_metadata, 2. call this function create or replace function swh_content_metadata_add() returns bigint language plpgsql as $$ declare res bigint; begin insert into content_metadata (id, metadata, indexer_configuration_id) select id, metadata, indexer_configuration_id from tmp_content_metadata tcm + order by id, indexer_configuration_id on conflict(id, indexer_configuration_id) do update set metadata = excluded.metadata; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_content_metadata_add() IS 'Add new content metadata'; -- create a temporary table for retrieving content_metadata create or replace function swh_mktemp_content_metadata() returns void language sql as $$ create temporary table if not exists tmp_content_metadata ( like content_metadata including defaults ) on commit delete rows; $$; comment on function swh_mktemp_content_metadata() is 'Helper table to add content metadata'; -- end content_metadata functions -- add tmp_directory_intrinsic_metadata entries to directory_intrinsic_metadata, -- overwriting duplicates. -- -- If filtering duplicates is in order, the call to -- swh_directory_intrinsic_metadata_missing must take place before calling this -- function. -- -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_directory_intrinsic_metadata, 2. call this function create or replace function swh_directory_intrinsic_metadata_add() returns bigint language plpgsql as $$ declare res bigint; begin insert into directory_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) select id, metadata, mappings, indexer_configuration_id from tmp_directory_intrinsic_metadata tcm + order by id, indexer_configuration_id on conflict(id, indexer_configuration_id) do update set metadata = excluded.metadata, mappings = excluded.mappings; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_directory_intrinsic_metadata_add() IS 'Add new directory intrinsic metadata'; -- create a temporary table for retrieving directory_intrinsic_metadata create or replace function swh_mktemp_directory_intrinsic_metadata() returns void language sql as $$ create temporary table if not exists tmp_directory_intrinsic_metadata ( like directory_intrinsic_metadata including defaults ) on commit delete rows; $$; comment on function swh_mktemp_directory_intrinsic_metadata() is 'Helper table to add directory intrinsic metadata'; -- create a temporary table for retrieving origin_intrinsic_metadata create or replace function swh_mktemp_origin_intrinsic_metadata() returns void language sql as $$ create temporary table if not exists tmp_origin_intrinsic_metadata ( like origin_intrinsic_metadata including defaults ) on commit delete rows; $$; comment on function swh_mktemp_origin_intrinsic_metadata() is 'Helper table to add origin intrinsic metadata'; create or replace function swh_mktemp_indexer_configuration() returns void language sql as $$ create temporary table if not exists tmp_indexer_configuration ( like indexer_configuration including defaults ) on commit delete rows; alter table tmp_indexer_configuration drop column if exists id; $$; -- add tmp_origin_intrinsic_metadata entries to origin_intrinsic_metadata, -- overwriting duplicates. -- -- If filtering duplicates is in order, the call to -- swh_origin_intrinsic_metadata_missing must take place before calling this -- function. -- -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_origin_intrinsic_metadata, 2. call this function create or replace function swh_origin_intrinsic_metadata_add() returns bigint language plpgsql as $$ declare res bigint; begin perform swh_origin_intrinsic_metadata_compute_tsvector(); insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_directory, metadata_tsvector, mappings) select id, metadata, indexer_configuration_id, from_directory, metadata_tsvector, mappings from tmp_origin_intrinsic_metadata + order by id, indexer_configuration_id on conflict(id, indexer_configuration_id) do update set metadata = excluded.metadata, metadata_tsvector = excluded.metadata_tsvector, mappings = excluded.mappings, from_directory = excluded.from_directory; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_origin_intrinsic_metadata_add() IS 'Add new origin intrinsic metadata'; -- Compute the metadata_tsvector column in tmp_origin_intrinsic_metadata. -- -- It uses the "pg_catalog.simple" dictionary, as it has no stopword, -- so it should be suitable for proper names and non-English text. create or replace function swh_origin_intrinsic_metadata_compute_tsvector() returns void language plpgsql as $$ begin update tmp_origin_intrinsic_metadata set metadata_tsvector = to_tsvector('pg_catalog.simple', metadata); end $$; -- create a temporary table for retrieving origin_extrinsic_metadata create or replace function swh_mktemp_origin_extrinsic_metadata() returns void language sql as $$ create temporary table if not exists tmp_origin_extrinsic_metadata ( like origin_extrinsic_metadata including defaults ) on commit delete rows; $$; comment on function swh_mktemp_origin_extrinsic_metadata() is 'Helper table to add origin extrinsic metadata'; create or replace function swh_mktemp_indexer_configuration() returns void language sql as $$ create temporary table if not exists tmp_indexer_configuration ( like indexer_configuration including defaults ) on commit delete rows; alter table tmp_indexer_configuration drop column if exists id; $$; -- add tmp_origin_extrinsic_metadata entries to origin_extrinsic_metadata, -- overwriting duplicates. -- -- If filtering duplicates is in order, the call to -- swh_origin_extrinsic_metadata_missing must take place before calling this -- function. -- -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_origin_extrinsic_metadata, 2. call this function create or replace function swh_origin_extrinsic_metadata_add() returns bigint language plpgsql as $$ declare res bigint; begin perform swh_origin_extrinsic_metadata_compute_tsvector(); insert into origin_extrinsic_metadata (id, metadata, indexer_configuration_id, from_remd_id, metadata_tsvector, mappings) select id, metadata, indexer_configuration_id, from_remd_id, metadata_tsvector, mappings from tmp_origin_extrinsic_metadata + order by id, indexer_configuration_id on conflict(id, indexer_configuration_id) do update set metadata = excluded.metadata, metadata_tsvector = excluded.metadata_tsvector, mappings = excluded.mappings, from_remd_id = excluded.from_remd_id; get diagnostics res = ROW_COUNT; return res; end $$; comment on function swh_origin_extrinsic_metadata_add() IS 'Add new origin extrinsic metadata'; -- Compute the metadata_tsvector column in tmp_origin_extrinsic_metadata. -- -- It uses the "pg_catalog.simple" dictionary, as it has no stopword, -- so it should be suitable for proper names and non-English text. create or replace function swh_origin_extrinsic_metadata_compute_tsvector() returns void language plpgsql as $$ begin update tmp_origin_extrinsic_metadata set metadata_tsvector = to_tsvector('pg_catalog.simple', metadata); end $$; -- add tmp_indexer_configuration entries to indexer_configuration, -- overwriting duplicates if any. -- -- operates in bulk: 0. create temporary tmp_indexer_configuration, 1. COPY to -- it, 2. call this function to insert and filtering out duplicates create or replace function swh_indexer_configuration_add() returns setof indexer_configuration language plpgsql as $$ begin insert into indexer_configuration(tool_name, tool_version, tool_configuration) select tool_name, tool_version, tool_configuration from tmp_indexer_configuration tmp + order by tool_name, tool_version, tool_configuration on conflict(tool_name, tool_version, tool_configuration) do nothing; return query select id, tool_name, tool_version, tool_configuration from tmp_indexer_configuration join indexer_configuration using(tool_name, tool_version, tool_configuration); return; end $$; diff --git a/swh/indexer/sql/upgrades/136.sql b/swh/indexer/sql/upgrades/136.sql new file mode 100644 index 0000000..01499ac --- /dev/null +++ b/swh/indexer/sql/upgrades/136.sql @@ -0,0 +1,214 @@ +-- SWH Indexer DB schema upgrade +-- from_version: 135 +-- to_version: 136 +-- description: Insert from temporary tables in consistent order + +insert into dbversion(version, release, description) + values(136, now(), 'Work In Progress'); + + +create or replace function swh_content_mimetype_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) + select id, mimetype, encoding, indexer_configuration_id + from tmp_content_mimetype tcm + order by id, indexer_configuration_id + on conflict(id, indexer_configuration_id) + do update set mimetype = excluded.mimetype, + encoding = excluded.encoding; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_content_language_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + insert into content_language (id, lang, indexer_configuration_id) + select id, lang, indexer_configuration_id + from tmp_content_language tcl + order by id, indexer_configuration_id + on conflict(id, indexer_configuration_id) + do update set lang = excluded.lang; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_content_ctags_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + insert into content_ctags (id, name, kind, line, lang, indexer_configuration_id) + select id, name, kind, line, lang, indexer_configuration_id + from tmp_content_ctags tct + order by id, hash_sha1(name), kind, line, lang, indexer_configuration_id + on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) + do nothing; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_content_fossology_license_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + -- insert unknown licenses first + insert into fossology_license (name) + select distinct license from tmp_content_fossology_license tmp + where not exists (select 1 from fossology_license where name=tmp.license) + on conflict(name) do nothing; + + insert into content_fossology_license (id, license_id, indexer_configuration_id) + select tcl.id, + (select id from fossology_license where name = tcl.license) as license, + indexer_configuration_id + from tmp_content_fossology_license tcl + order by tcl.id, license, indexer_configuration_id + on conflict(id, license_id, indexer_configuration_id) + do update set license_id = excluded.license_id; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_content_metadata_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + insert into content_metadata (id, metadata, indexer_configuration_id) + select id, metadata, indexer_configuration_id + from tmp_content_metadata tcm + order by id, indexer_configuration_id + on conflict(id, indexer_configuration_id) + do update set metadata = excluded.metadata; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_directory_intrinsic_metadata_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + insert into directory_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) + select id, metadata, mappings, indexer_configuration_id + from tmp_directory_intrinsic_metadata tcm + order by id, indexer_configuration_id + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + mappings = excluded.mappings; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_origin_intrinsic_metadata_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + perform swh_origin_intrinsic_metadata_compute_tsvector(); + + insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_directory, metadata_tsvector, mappings) + select id, metadata, indexer_configuration_id, from_directory, + metadata_tsvector, mappings + from tmp_origin_intrinsic_metadata + order by id, indexer_configuration_id + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + metadata_tsvector = excluded.metadata_tsvector, + mappings = excluded.mappings, + from_directory = excluded.from_directory; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_origin_extrinsic_metadata_add() + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + perform swh_origin_extrinsic_metadata_compute_tsvector(); + + insert into origin_extrinsic_metadata (id, metadata, indexer_configuration_id, from_remd_id, metadata_tsvector, mappings) + select id, metadata, indexer_configuration_id, from_remd_id, + metadata_tsvector, mappings + from tmp_origin_extrinsic_metadata + order by id, indexer_configuration_id + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + metadata_tsvector = excluded.metadata_tsvector, + mappings = excluded.mappings, + from_remd_id = excluded.from_remd_id; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + + +create or replace function swh_indexer_configuration_add() + returns setof indexer_configuration + language plpgsql +as $$ +begin + insert into indexer_configuration(tool_name, tool_version, tool_configuration) + select tool_name, tool_version, tool_configuration from tmp_indexer_configuration tmp + order by tool_name, tool_version, tool_configuration + on conflict(tool_name, tool_version, tool_configuration) do nothing; + + return query + select id, tool_name, tool_version, tool_configuration + from tmp_indexer_configuration join indexer_configuration + using(tool_name, tool_version, tool_configuration); + + return; +end +$$; + + diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py index 2c7bbc2..34f1bc3 100644 --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -1,722 +1,716 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from importlib import import_module import json from typing import Dict, Iterable, List, Optional, Tuple, Union import warnings import psycopg2 import psycopg2.pool from swh.core.db.common import db_transaction from swh.indexer.storage.interface import IndexerStorageInterface from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import SHA1_SIZE from swh.storage.exc import StorageDBError from swh.storage.utils import get_partition_bounds_bytes from . import converters from .db import Db from .exc import DuplicateId, IndexerStorageArgumentException from .interface import PagedResult, Sha1 from .metrics import process_metrics, send_metric, timed from .model import ( ContentLicenseRow, ContentMetadataRow, ContentMimetypeRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from .writer import JournalWriter INDEXER_CFG_KEY = "indexer_storage" MAPPING_NAMES = ["cff", "codemeta", "gemspec", "maven", "npm", "pkg-info"] SERVER_IMPLEMENTATIONS: Dict[str, str] = { "postgresql": ".IndexerStorage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.IndexerStorage", # deprecated "local": ".IndexerStorage", } def sanitize_json(doc): """Recursively replaces NUL characters, as postgresql does not allow them in text fields.""" if isinstance(doc, str): return doc.replace("\x00", "") elif not hasattr(doc, "__iter__"): return doc elif isinstance(doc, dict): return {sanitize_json(k): sanitize_json(v) for (k, v) in doc.items()} elif isinstance(doc, (list, tuple)): return [sanitize_json(v) for v in doc] else: raise TypeError(f"Unexpected object type in sanitize_json: {doc}") def get_indexer_storage(cls: str, **kwargs) -> IndexerStorageInterface: """Instantiate an indexer storage implementation of class `cls` with arguments `kwargs`. Args: cls: indexer storage class (local, remote or memory) kwargs: dictionary of arguments passed to the indexer storage class constructor Returns: an instance of swh.indexer.storage Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] class_path = SERVER_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( f"Unknown indexer storage class `{cls}`. " f"Supported: {', '.join(SERVER_IMPLEMENTATIONS)}" ) (module_path, class_name) = class_path.rsplit(".", 1) module = import_module(module_path if module_path else ".", package=__package__) BackendClass = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) idx_storage = BackendClass(**kwargs) if check_config: if not idx_storage.check_config(**check_config): raise EnvironmentError("Indexer storage check config failed") return idx_storage def check_id_duplicates(data): """ If any two row models in `data` have the same unique key, raises a `ValueError`. Values associated to the key must be hashable. Args: data (List[dict]): List of dictionaries to be inserted >>> check_id_duplicates([ ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="GPL"), ... ContentLicenseRow(id=b'foo', indexer_configuration_id=32, license="GPL"), ... ]) >>> check_id_duplicates([ ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="AGPL"), ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="AGPL"), ... ]) Traceback (most recent call last): ... swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'indexer_configuration_id': 42, 'license': 'AGPL'}] """ # noqa counter = Counter(tuple(sorted(item.unique_key().items())) for item in data) duplicates = [id_ for (id_, count) in counter.items() if count >= 2] if duplicates: raise DuplicateId(list(map(dict, duplicates))) class IndexerStorage: """SWH Indexer Storage Datastore""" - current_version = 135 + current_version = 136 def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): """ Args: db: either a libpq connection string, or a psycopg2 connection journal_writer: configuration passed to `swh.journal.writer.get_journal_writer` """ self.journal_writer = JournalWriter(self._tool_get_from_id, journal_writer) try: if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = Db(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db ) self._db = None except psycopg2.OperationalError as e: raise StorageDBError(e) def get_db(self): if self._db: return self._db return Db.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @timed @db_transaction() def check_config(self, *, check_write, db=None, cur=None): # Check permissions on one of the tables if check_write: check = "INSERT" else: check = "SELECT" cur.execute( "select has_table_privilege(current_user, 'content_mimetype', %s)", # noqa (check,), ) return cur.fetchone()[0] @timed @db_transaction() def content_mimetype_missing( self, mimetypes: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [obj[0] for obj in db.content_mimetype_missing_from_list(mimetypes, cur)] @timed @db_transaction() def get_partition( self, indexer_type: str, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, with_textual_data=False, db=None, cur=None, ) -> PagedResult[Sha1]: """Retrieve ids of content with `indexer_type` within within partition partition_id bound by limit. Args: **indexer_type**: Type of data content to index (mimetype, etc...) **indexer_configuration_id**: The tool used to index data **partition_id**: index of the partition to fetch **nb_partitions**: total number of partitions to split into **page_token**: opaque token used for pagination **limit**: Limit result (default to 1000) **with_textual_data** (bool): Deal with only textual content (True) or all content (all contents by defaults, False) Raises: IndexerStorageArgumentException for; - limit to None - wrong indexer_type provided Returns: PagedResult of Sha1. If next_page_token is None, there is no more data to fetch """ if limit is None: raise IndexerStorageArgumentException("limit should not be None") if indexer_type not in db.content_indexer_names: err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]" raise IndexerStorageArgumentException(err) start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE) if page_token is not None: start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE next_page_token: Optional[str] = None ids = [ row[0] for row in db.content_get_range( indexer_type, start, end, indexer_configuration_id, limit=limit + 1, with_textual_data=with_textual_data, cur=cur, ) ] if len(ids) >= limit: next_page_token = hash_to_hex(ids[-1]) ids = ids[:limit] assert len(ids) <= limit return PagedResult(results=ids, next_page_token=next_page_token) @timed @db_transaction() def content_mimetype_get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, db=None, cur=None, ) -> PagedResult[Sha1]: return self.get_partition( "mimetype", indexer_configuration_id, partition_id, nb_partitions, page_token=page_token, limit=limit, db=db, cur=cur, ) @timed @process_metrics @db_transaction() def content_mimetype_add( self, mimetypes: List[ContentMimetypeRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(mimetypes) - mimetypes.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_mimetype", mimetypes) db.mktemp_content_mimetype(cur) db.copy_to( [m.to_dict() for m in mimetypes], "tmp_content_mimetype", ["id", "mimetype", "encoding", "indexer_configuration_id"], cur, ) count = db.content_mimetype_add_from_temp(cur) return {"content_mimetype:add": count} @timed @db_transaction() def content_mimetype_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentMimetypeRow]: return [ ContentMimetypeRow.from_dict( converters.db_to_mimetype(dict(zip(db.content_mimetype_cols, c))) ) for c in db.content_mimetype_get_from_list(ids, cur) ] @timed @db_transaction() def content_fossology_license_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentLicenseRow]: return [ ContentLicenseRow.from_dict( converters.db_to_fossology_license( dict(zip(db.content_fossology_license_cols, c)) ) ) for c in db.content_fossology_license_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def content_fossology_license_add( self, licenses: List[ContentLicenseRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(licenses) - licenses.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_fossology_license", licenses) db.mktemp_content_fossology_license(cur) db.copy_to( [license.to_dict() for license in licenses], tblname="tmp_content_fossology_license", columns=["id", "license", "indexer_configuration_id"], cur=cur, ) count = db.content_fossology_license_add_from_temp(cur) return {"content_fossology_license:add": count} @timed @db_transaction() def content_fossology_license_get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, db=None, cur=None, ) -> PagedResult[Sha1]: return self.get_partition( "fossology_license", indexer_configuration_id, partition_id, nb_partitions, page_token=page_token, limit=limit, with_textual_data=True, db=db, cur=cur, ) @timed @db_transaction() def content_metadata_missing( self, metadata: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [obj[0] for obj in db.content_metadata_missing_from_list(metadata, cur)] @timed @db_transaction() def content_metadata_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[ContentMetadataRow]: return [ ContentMetadataRow.from_dict( converters.db_to_metadata(dict(zip(db.content_metadata_cols, c))) ) for c in db.content_metadata_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def content_metadata_add( self, metadata: List[ContentMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) - metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("content_metadata", metadata) db.mktemp_content_metadata(cur) rows = [m.to_dict() for m in metadata] for row in rows: row["metadata"] = sanitize_json(row["metadata"]) db.copy_to( rows, "tmp_content_metadata", ["id", "metadata", "indexer_configuration_id"], cur, ) count = db.content_metadata_add_from_temp(cur) return { "content_metadata:add": count, } @timed @db_transaction() def directory_intrinsic_metadata_missing( self, metadata: Iterable[Dict], db=None, cur=None ) -> List[Tuple[Sha1, int]]: return [ obj[0] for obj in db.directory_intrinsic_metadata_missing_from_list(metadata, cur) ] @timed @db_transaction() def directory_intrinsic_metadata_get( self, ids: Iterable[Sha1], db=None, cur=None ) -> List[DirectoryIntrinsicMetadataRow]: return [ DirectoryIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.directory_intrinsic_metadata_cols, c)) ) ) for c in db.directory_intrinsic_metadata_get_from_list(ids, cur) ] @timed @process_metrics @db_transaction() def directory_intrinsic_metadata_add( self, metadata: List[DirectoryIntrinsicMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) - metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("directory_intrinsic_metadata", metadata) db.mktemp_directory_intrinsic_metadata(cur) rows = [m.to_dict() for m in metadata] for row in rows: row["metadata"] = sanitize_json(row["metadata"]) db.copy_to( rows, "tmp_directory_intrinsic_metadata", ["id", "metadata", "mappings", "indexer_configuration_id"], cur, ) count = db.directory_intrinsic_metadata_add_from_temp(cur) return { "directory_intrinsic_metadata:add": count, } @timed @db_transaction() def origin_intrinsic_metadata_get( self, urls: Iterable[str], db=None, cur=None ) -> List[OriginIntrinsicMetadataRow]: return [ OriginIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c)) ) ) for c in db.origin_intrinsic_metadata_get_from_list(urls, cur) ] @timed @process_metrics @db_transaction() def origin_intrinsic_metadata_add( self, metadata: List[OriginIntrinsicMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) - metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("origin_intrinsic_metadata", metadata) db.mktemp_origin_intrinsic_metadata(cur) rows = [m.to_dict() for m in metadata] for row in rows: row["metadata"] = sanitize_json(row["metadata"]) db.copy_to( rows, "tmp_origin_intrinsic_metadata", [ "id", "metadata", "indexer_configuration_id", "from_directory", "mappings", ], cur, ) count = db.origin_intrinsic_metadata_add_from_temp(cur) return { "origin_intrinsic_metadata:add": count, } @timed @db_transaction() def origin_intrinsic_metadata_search_fulltext( self, conjunction: List[str], limit: int = 100, db=None, cur=None ) -> List[OriginIntrinsicMetadataRow]: return [ OriginIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c)) ) ) for c in db.origin_intrinsic_metadata_search_fulltext( conjunction, limit=limit, cur=cur ) ] @timed @db_transaction() def origin_intrinsic_metadata_search_by_producer( self, page_token: str = "", limit: int = 100, ids_only: bool = False, mappings: Optional[List[str]] = None, tool_ids: Optional[List[int]] = None, db=None, cur=None, ) -> PagedResult[Union[str, OriginIntrinsicMetadataRow]]: assert isinstance(page_token, str) # we go to limit+1 to check whether we should add next_page_token in # the response rows = db.origin_intrinsic_metadata_search_by_producer( page_token, limit + 1, ids_only, mappings, tool_ids, cur ) next_page_token = None if ids_only: results = [origin for (origin,) in rows] if len(results) > limit: results[limit:] = [] next_page_token = results[-1] else: results = [ OriginIntrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, row)) ) ) for row in rows ] if len(results) > limit: results[limit:] = [] next_page_token = results[-1].id return PagedResult( results=results, next_page_token=next_page_token, ) @timed @db_transaction() def origin_intrinsic_metadata_stats(self, db=None, cur=None): mapping_names = [m for m in MAPPING_NAMES] select_parts = [] # Count rows for each mapping for mapping_name in mapping_names: select_parts.append( ( "sum(case when (mappings @> ARRAY['%s']) " " then 1 else 0 end)" ) % mapping_name ) # Total select_parts.append("sum(1)") # Rows whose metadata has at least one key that is not '@context' select_parts.append( "sum(case when ('{}'::jsonb @> (metadata - '@context')) " " then 0 else 1 end)" ) cur.execute( "select " + ", ".join(select_parts) + " from origin_intrinsic_metadata" ) results = dict(zip(mapping_names + ["total", "non_empty"], cur.fetchone())) return { "total": results.pop("total"), "non_empty": results.pop("non_empty"), "per_mapping": results, } @timed @db_transaction() def origin_extrinsic_metadata_get( self, urls: Iterable[str], db=None, cur=None ) -> List[OriginExtrinsicMetadataRow]: return [ OriginExtrinsicMetadataRow.from_dict( converters.db_to_metadata( dict(zip(db.origin_extrinsic_metadata_cols, c)) ) ) for c in db.origin_extrinsic_metadata_get_from_list(urls, cur) ] @timed @process_metrics @db_transaction() def origin_extrinsic_metadata_add( self, metadata: List[OriginExtrinsicMetadataRow], db=None, cur=None, ) -> Dict[str, int]: check_id_duplicates(metadata) - metadata.sort(key=lambda m: m.id) self.journal_writer.write_additions("origin_extrinsic_metadata", metadata) db.mktemp_origin_extrinsic_metadata(cur) rows = [m.to_dict() for m in metadata] for row in rows: row["metadata"] = sanitize_json(row["metadata"]) db.copy_to( rows, "tmp_origin_extrinsic_metadata", [ "id", "metadata", "indexer_configuration_id", "from_remd_id", "mappings", ], cur, ) count = db.origin_extrinsic_metadata_add_from_temp(cur) return { "origin_extrinsic_metadata:add": count, } @timed @db_transaction() def indexer_configuration_add(self, tools, db=None, cur=None): db.mktemp_indexer_configuration(cur) db.copy_to( tools, "tmp_indexer_configuration", ["tool_name", "tool_version", "tool_configuration"], cur, ) tools = db.indexer_configuration_add_from_temp(cur) results = [dict(zip(db.indexer_configuration_cols, line)) for line in tools] send_metric( "indexer_configuration:add", len(results), method_name="indexer_configuration_add", ) return results @timed @db_transaction() def indexer_configuration_get(self, tool, db=None, cur=None): tool_conf = tool["tool_configuration"] if isinstance(tool_conf, dict): tool_conf = json.dumps(tool_conf) idx = db.indexer_configuration_get( tool["tool_name"], tool["tool_version"], tool_conf ) if not idx: return None return dict(zip(db.indexer_configuration_cols, idx)) @db_transaction() def _tool_get_from_id(self, id_, db, cur): tool = dict( zip( db.indexer_configuration_cols, db.indexer_configuration_get_from_id(id_, cur), ) ) return { "id": tool["id"], "name": tool["tool_name"], "version": tool["tool_version"], "configuration": tool["tool_configuration"], }