diff --git a/sql/upgrades/131.sql b/sql/upgrades/131.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/131.sql @@ -0,0 +1,223 @@ +-- SWH Indexer DB schema upgrade +-- from_version: 130 +-- to_version: 131 +-- description: _add function returns the inserted rows + +insert into dbversion(version, release, description) + values(131, now(), 'Work In Progress'); + +drop function swh_content_mimetype_add(boolean); +drop function swh_content_language_add(boolean); +drop function swh_content_ctags_add(boolean); +drop function swh_content_fossology_license_add(boolean); +drop function swh_content_metadata_add(boolean); +drop function swh_revision_intrinsic_metadata_add(boolean); +drop function swh_origin_intrinsic_metadata_add(boolean); + +create or replace function swh_content_mimetype_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) + select id, mimetype, encoding, indexer_configuration_id + from tmp_content_mimetype tcm + on conflict(id, indexer_configuration_id) + do update set mimetype = excluded.mimetype, + encoding = excluded.encoding; + else + insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) + select id, mimetype, encoding, indexer_configuration_id + from tmp_content_mimetype tcm + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_mimetype_add(boolean) IS 'Add new content mimetypes'; + +create or replace function swh_content_language_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into content_language (id, lang, indexer_configuration_id) + select id, lang, indexer_configuration_id + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do update set lang = excluded.lang; + else + insert into content_language (id, lang, indexer_configuration_id) + select id, lang, indexer_configuration_id + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_language_add(boolean) IS 'Add new content languages'; + + +create or replace function swh_content_ctags_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + delete from content_ctags + where id in (select tmp.id + from tmp_content_ctags tmp + inner join indexer_configuration i on i.id=tmp.indexer_configuration_id); + end if; + + insert into content_ctags (id, name, kind, line, lang, indexer_configuration_id) + select id, name, kind, line, lang, indexer_configuration_id + from tmp_content_ctags tct + on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) + do nothing; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_ctags_add(boolean) IS 'Add new ctags symbols per content'; + +create or replace function swh_content_fossology_license_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + -- insert unknown licenses first + insert into fossology_license (name) + select distinct license from tmp_content_fossology_license tmp + where not exists (select 1 from fossology_license where name=tmp.license) + on conflict(name) do nothing; + + if conflict_update then + insert into content_fossology_license (id, license_id, indexer_configuration_id) + select tcl.id, + (select id from fossology_license where name = tcl.license) as license, + indexer_configuration_id + from tmp_content_fossology_license tcl + on conflict(id, license_id, indexer_configuration_id) + do update set license_id = excluded.license_id; + end if; + + insert into content_fossology_license (id, license_id, indexer_configuration_id) + select tcl.id, + (select id from fossology_license where name = tcl.license) as license, + indexer_configuration_id + from tmp_content_fossology_license tcl + on conflict(id, license_id, indexer_configuration_id) + do nothing; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +create or replace function swh_content_metadata_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into content_metadata (id, metadata, indexer_configuration_id) + select id, metadata, indexer_configuration_id + from tmp_content_metadata tcm + on conflict(id, indexer_configuration_id) + do update set metadata = excluded.metadata; + else + insert into content_metadata (id, metadata, indexer_configuration_id) + select id, metadata, indexer_configuration_id + from tmp_content_metadata tcm + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_metadata_add(boolean) IS 'Add new content metadata'; + + +create or replace function swh_revision_intrinsic_metadata_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) + select id, metadata, mappings, indexer_configuration_id + from tmp_revision_intrinsic_metadata tcm + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + mappings = excluded.mappings; + else + insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) + select id, metadata, mappings, indexer_configuration_id + from tmp_revision_intrinsic_metadata tcm + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_revision_intrinsic_metadata_add(boolean) IS 'Add new revision intrinsic metadata'; + +create or replace function swh_origin_intrinsic_metadata_add( + conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + perform swh_origin_intrinsic_metadata_compute_tsvector(); + if conflict_update then + insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) + select id, metadata, indexer_configuration_id, from_revision, + metadata_tsvector, mappings + from tmp_origin_intrinsic_metadata + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + metadata_tsvector = excluded.metadata_tsvector, + mappings = excluded.mappings, + from_revision = excluded.from_revision; + else + insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) + select id, metadata, indexer_configuration_id, from_revision, + metadata_tsvector, mappings + from tmp_origin_intrinsic_metadata + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; diff --git a/swh/indexer/sql/30-swh-schema.sql b/swh/indexer/sql/30-swh-schema.sql --- a/swh/indexer/sql/30-swh-schema.sql +++ b/swh/indexer/sql/30-swh-schema.sql @@ -14,7 +14,7 @@ ); insert into dbversion(version, release, description) - values(130, now(), 'Work In Progress'); + values(131, now(), 'Work In Progress'); -- Computing metadata on sha1's contents -- a SHA1 checksum (not necessarily originating from Git) diff --git a/swh/indexer/sql/40-swh-func.sql b/swh/indexer/sql/40-swh-func.sql --- a/swh/indexer/sql/40-swh-func.sql +++ b/swh/indexer/sql/40-swh-func.sql @@ -47,29 +47,31 @@ -- swh_content_mimetype_missing must take place before calling this -- function. -- --- -- operates in bulk: 0. swh_mktemp(content_mimetype), 1. COPY to tmp_content_mimetype, -- 2. call this function create or replace function swh_content_mimetype_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) select id, mimetype, encoding, indexer_configuration_id from tmp_content_mimetype tcm - on conflict(id, indexer_configuration_id) - do update set mimetype = excluded.mimetype, - encoding = excluded.encoding; - + on conflict(id, indexer_configuration_id) + do update set mimetype = excluded.mimetype, + encoding = excluded.encoding; else insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) select id, mimetype, encoding, indexer_configuration_id from tmp_content_mimetype tcm - on conflict(id, indexer_configuration_id) do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -85,25 +87,27 @@ -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_content_language, 2. call this function create or replace function swh_content_language_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into content_language (id, lang, indexer_configuration_id) select id, lang, indexer_configuration_id - from tmp_content_language tcl - on conflict(id, indexer_configuration_id) - do update set lang = excluded.lang; - + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do update set lang = excluded.lang; else insert into content_language (id, lang, indexer_configuration_id) select id, lang, indexer_configuration_id - from tmp_content_language tcl - on conflict(id, indexer_configuration_id) - do nothing; + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -141,9 +145,11 @@ -- operates in bulk: 0. swh_mktemp(content_ctags), 1. COPY to tmp_content_ctags, -- 2. call this function create or replace function swh_content_ctags_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then delete from content_ctags @@ -155,9 +161,10 @@ insert into content_ctags (id, name, kind, line, lang, indexer_configuration_id) select id, name, kind, line, lang, indexer_configuration_id from tmp_content_ctags tct - on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) - do nothing; - return; + on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) + do nothing; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -217,9 +224,11 @@ -- operates in bulk: 0. swh_mktemp(content_fossology_license), 1. COPY to -- tmp_content_fossology_license, 2. call this function create or replace function swh_content_fossology_license_add(conflict_update boolean) - returns void - language plpgsql + returns bigint + language plpgsql as $$ +declare + res bigint; begin -- insert unknown licenses first insert into fossology_license (name) @@ -233,9 +242,8 @@ (select id from fossology_license where name = tcl.license) as license, indexer_configuration_id from tmp_content_fossology_license tcl - on conflict(id, license_id, indexer_configuration_id) - do update set license_id = excluded.license_id; - return; + on conflict(id, license_id, indexer_configuration_id) + do update set license_id = excluded.license_id; end if; insert into content_fossology_license (id, license_id, indexer_configuration_id) @@ -243,14 +251,17 @@ (select id from fossology_license where name = tcl.license) as license, indexer_configuration_id from tmp_content_fossology_license tcl - on conflict(id, license_id, indexer_configuration_id) - do nothing; - return; + on conflict(id, license_id, indexer_configuration_id) + do nothing; + + get diagnostics res = ROW_COUNT; + return res; end $$; comment on function swh_content_fossology_license_add(boolean) IS 'Add new content licenses'; + -- content_metadata functions -- add tmp_content_metadata entries to content_metadata, overwriting @@ -263,25 +274,27 @@ -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_content_metadata, 2. call this function create or replace function swh_content_metadata_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into content_metadata (id, metadata, indexer_configuration_id) select id, metadata, indexer_configuration_id - from tmp_content_metadata tcm - on conflict(id, indexer_configuration_id) - do update set metadata = excluded.metadata; - + from tmp_content_metadata tcm + on conflict(id, indexer_configuration_id) + do update set metadata = excluded.metadata; else insert into content_metadata (id, metadata, indexer_configuration_id) select id, metadata, indexer_configuration_id from tmp_content_metadata tcm - on conflict(id, indexer_configuration_id) - do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -312,27 +325,29 @@ -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_revision_intrinsic_metadata, 2. call this function create or replace function swh_revision_intrinsic_metadata_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) select id, metadata, mappings, indexer_configuration_id - from tmp_revision_intrinsic_metadata tcm - on conflict(id, indexer_configuration_id) - do update set - metadata = excluded.metadata, - mappings = excluded.mappings; - + from tmp_revision_intrinsic_metadata tcm + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + mappings = excluded.mappings; else insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) select id, metadata, mappings, indexer_configuration_id from tmp_revision_intrinsic_metadata tcm - on conflict(id, indexer_configuration_id) - do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -408,32 +423,34 @@ -- tmp_origin_intrinsic_metadata, 2. call this function create or replace function swh_origin_intrinsic_metadata_add( conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin perform swh_origin_intrinsic_metadata_compute_tsvector(); if conflict_update then insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) select id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings - from tmp_origin_intrinsic_metadata - on conflict(id, indexer_configuration_id) - do update set - metadata = excluded.metadata, - metadata_tsvector = excluded.metadata_tsvector, - mappings = excluded.mappings, - from_revision = excluded.from_revision; - + from tmp_origin_intrinsic_metadata + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + metadata_tsvector = excluded.metadata_tsvector, + mappings = excluded.mappings, + from_revision = excluded.from_revision; else insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) select id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings from tmp_origin_intrinsic_metadata - on conflict(id, indexer_configuration_id) - do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -9,9 +9,11 @@ import psycopg2.pool from collections import defaultdict, Counter +from typing import Dict, List from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError +from swh.storage.metrics import send_metric, timed, process_metrics from . import converters from .db import Db @@ -165,16 +167,31 @@ indexer_configuration_id, limit=limit, db=db, cur=cur) + @timed + @process_metrics @db_transaction() - def content_mimetype_add(self, mimetypes, conflict_update=False, db=None, - cur=None): + def content_mimetype_add( + self, mimetypes: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: + """Add mimetypes to the storage (if conflict_update is True, this will + override existing data if any). + + Returns: + A dict with the number of new elements added to the storage. + + """ check_id_duplicates(mimetypes) mimetypes.sort(key=lambda m: m['id']) db.mktemp_content_mimetype(cur) db.copy_to(mimetypes, 'tmp_content_mimetype', ['id', 'mimetype', 'encoding', 'indexer_configuration_id'], cur) - db.content_mimetype_add_from_temp(conflict_update, cur) + count = db.content_mimetype_add_from_temp(conflict_update, cur) + send_metric('content_mimetype:add', + count=count, method_name='content_mimetype_add') + return { + 'content_mimetype:add': count + } @db_transaction_generator() def content_mimetype_get(self, ids, db=None, cur=None): @@ -193,9 +210,12 @@ yield converters.db_to_language( dict(zip(db.content_language_cols, c))) + @timed + @process_metrics @db_transaction() - def content_language_add(self, languages, conflict_update=False, db=None, - cur=None): + def content_language_add( + self, languages: List[Dict], + conflict_update: bool = False, db=None, cur=None) -> Dict: check_id_duplicates(languages) languages.sort(key=lambda m: m['id']) db.mktemp_content_language(cur) @@ -209,7 +229,12 @@ 'tmp_content_language', ['id', 'lang', 'indexer_configuration_id'], cur) - db.content_language_add_from_temp(conflict_update, cur) + count = db.content_language_add_from_temp(conflict_update, cur) + send_metric('content_language:add', + count=count, method_name='content_language_add') + return { + 'content_language:add': count + } @db_transaction_generator() def content_ctags_missing(self, ctags, db=None, cur=None): @@ -221,9 +246,12 @@ for c in db.content_ctags_get_from_list(ids, cur): yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) + @timed + @process_metrics @db_transaction() - def content_ctags_add(self, ctags, conflict_update=False, db=None, - cur=None): + def content_ctags_add( + self, ctags: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(ctags) ctags.sort(key=lambda m: m['id']) @@ -241,7 +269,12 @@ 'lang', 'indexer_configuration_id'], cur=cur) - db.content_ctags_add_from_temp(conflict_update, cur) + count = db.content_ctags_add_from_temp(conflict_update, cur) + send_metric('content_ctags:add', + count=count, method_name='content_ctags_add') + return { + 'content_ctags:add': count + } @db_transaction_generator() def content_ctags_search(self, expression, @@ -262,9 +295,12 @@ for id_, facts in d.items(): yield {id_: facts} + @timed + @process_metrics @db_transaction() - def content_fossology_license_add(self, licenses, conflict_update=False, - db=None, cur=None): + def content_fossology_license_add( + self, licenses: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(licenses) licenses.sort(key=lambda m: m['id']) db.mktemp_content_fossology_license(cur) @@ -278,7 +314,13 @@ tblname='tmp_content_fossology_license', columns=['id', 'license', 'indexer_configuration_id'], cur=cur) - db.content_fossology_license_add_from_temp(conflict_update, cur) + count = db.content_fossology_license_add_from_temp( + conflict_update, cur) + send_metric('content_fossology_license:add', + count=count, method_name='content_fossology_license_add') + return { + 'content_fossology_license:add': count + } @db_transaction() def content_fossology_license_get_range( @@ -299,9 +341,12 @@ yield converters.db_to_metadata( dict(zip(db.content_metadata_cols, c))) + @timed + @process_metrics @db_transaction() - def content_metadata_add(self, metadata, conflict_update=False, db=None, - cur=None): + def content_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -310,7 +355,12 @@ db.copy_to(metadata, 'tmp_content_metadata', ['id', 'metadata', 'indexer_configuration_id'], cur) - db.content_metadata_add_from_temp(conflict_update, cur) + count = db.content_metadata_add_from_temp(conflict_update, cur) + send_metric('content_metadata:add', + count=count, method_name='content_metadata_add') + return { + 'content_metadata:add': count, + } @db_transaction_generator() def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None): @@ -324,9 +374,12 @@ yield converters.db_to_metadata( dict(zip(db.revision_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() - def revision_intrinsic_metadata_add(self, metadata, conflict_update=False, - db=None, cur=None): + def revision_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -336,11 +389,23 @@ ['id', 'metadata', 'mappings', 'indexer_configuration_id'], cur) - db.revision_intrinsic_metadata_add_from_temp(conflict_update, cur) + count = db.revision_intrinsic_metadata_add_from_temp( + conflict_update, cur) + send_metric('revision_intrinsic_metadata:add', + count=count, method_name='revision_intrinsic_metadata_add') + return { + 'revision_intrinsic_metadata:add': count, + } + @timed + @process_metrics @db_transaction() - def revision_intrinsic_metadata_delete(self, entries, db=None, cur=None): - db.revision_intrinsic_metadata_delete(entries, cur) + def revision_intrinsic_metadata_delete( + self, entries: List[Dict], db=None, cur=None) -> Dict: + count = db.revision_intrinsic_metadata_delete(entries, cur) + return { + 'revision_intrinsic_metadata:del': count + } @db_transaction_generator() def origin_intrinsic_metadata_get(self, ids, db=None, cur=None): @@ -348,10 +413,12 @@ yield converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() - def origin_intrinsic_metadata_add(self, metadata, - conflict_update=False, db=None, - cur=None): + def origin_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -362,12 +429,23 @@ 'indexer_configuration_id', 'from_revision', 'mappings'], cur) - db.origin_intrinsic_metadata_add_from_temp(conflict_update, cur) + count = db.origin_intrinsic_metadata_add_from_temp( + conflict_update, cur) + send_metric('content_origin_intrinsic:add', + count=count, method_name='content_origin_intrinsic_add') + return { + 'origin_intrinsic_metadata:add': count, + } + @timed + @process_metrics @db_transaction() def origin_intrinsic_metadata_delete( - self, entries, db=None, cur=None): - db.origin_intrinsic_metadata_delete(entries, cur) + self, entries: List[Dict], db=None, cur=None) -> Dict: + count = db.origin_intrinsic_metadata_delete(entries, cur) + return { + 'origin_intrinsic_metadata:del': count, + } @db_transaction_generator() def origin_intrinsic_metadata_search_fulltext( diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py --- a/swh/indexer/storage/db.py +++ b/swh/indexer/storage/db.py @@ -60,8 +60,10 @@ def mktemp_content_mimetype(self, cur=None): pass def content_mimetype_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_mimetype_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute('select * from swh_content_mimetype_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def _convert_key(self, key, main_table='c'): """Convert keys according to specific use in the module. @@ -176,8 +178,10 @@ def mktemp_content_language(self, cur=None): pass def content_language_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_language_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute('select * from swh_content_language_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def content_language_get_from_list(self, ids, cur=None): yield from self._get_from_list( @@ -201,8 +205,11 @@ def mktemp_content_ctags(self, cur=None): pass def content_ctags_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_ctags_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute( + 'select * from swh_content_ctags_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def content_ctags_get_from_list(self, ids, cur=None): cur = self._cursor(cur) @@ -252,9 +259,11 @@ """Add new licenses per content. """ - self._cursor(cur).execute( - "SELECT swh_content_fossology_license_add(%s)", + cur = self._cursor(cur) + cur.execute( + 'select * from swh_content_fossology_license_add(%s)', (conflict_update, )) + return cur.fetchone()[0] def content_fossology_license_get_from_list(self, ids, cur=None): """Retrieve licenses per id. @@ -293,8 +302,11 @@ def mktemp_content_metadata(self, cur=None): pass def content_metadata_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_metadata_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute( + 'select * from swh_content_metadata_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def content_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( @@ -321,9 +333,11 @@ def revision_intrinsic_metadata_add_from_temp( self, conflict_update, cur=None): - self._cursor(cur).execute( - "SELECT swh_revision_intrinsic_metadata_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute( + 'select * from swh_revision_intrinsic_metadata_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def revision_intrinsic_metadata_delete( self, entries, cur=None): @@ -331,9 +345,11 @@ cur.execute( "DELETE from revision_intrinsic_metadata " "WHERE (id, indexer_configuration_id) IN " - " (VALUES %s)" % (', '.join('%s' for _ in entries)), + " (VALUES %s) " + "RETURNING id" % (', '.join('%s' for _ in entries)), tuple((e['id'], e['indexer_configuration_id']) for e in entries),) + return len(cur.fetchall()) def revision_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( @@ -358,8 +374,9 @@ self, conflict_update, cur=None): cur = self._cursor(cur) cur.execute( - "SELECT swh_origin_intrinsic_metadata_add(%s)", - (conflict_update, )) + 'select * from swh_origin_intrinsic_metadata_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def origin_intrinsic_metadata_delete( self, entries, cur=None): @@ -367,9 +384,11 @@ cur.execute( "DELETE from origin_intrinsic_metadata " "WHERE (id, indexer_configuration_id) IN" - " (VALUES %s)" % (', '.join('%s' for _ in entries)), + " (VALUES %s) " + "RETURNING id" % (', '.join('%s' for _ in entries)), tuple((e['id'], e['indexer_configuration_id']) for e in entries),) + return len(cur.fetchall()) def origin_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -123,7 +123,7 @@ 'next': None, } - def add(self, data, conflict_update): + def add(self, data: List[Dict], conflict_update: bool) -> int: """Add data not present in storage. Args: @@ -140,22 +140,27 @@ """ data = list(data) check_id_duplicates(data) + count = 0 for item in data: item = item.copy() tool_id = item.pop('indexer_configuration_id') id_ = item.pop('id') - data = item + data_item = item if not conflict_update and \ tool_id in self._tools_per_id.get(id_, set()): # Duplicate, should not be updated continue key = (id_, tool_id) - self._data[key] = data + self._data[key] = data_item self._tools_per_id[id_].add(tool_id) + count += 1 if id_ not in self._sorted_ids: bisect.insort(self._sorted_ids, id_) + return count - def add_merge(self, new_data, conflict_update, merged_key): + def add_merge(self, new_data: List[Dict], conflict_update: bool, + merged_key: str) -> int: + added = 0 for new_item in new_data: id_ = new_item['id'] tool_id = new_item['indexer_configuration_id'] @@ -172,7 +177,7 @@ for new_subitem in new_item[merged_key]: if new_subitem not in all_subitems: all_subitems.append(new_subitem) - self.add([ + added += self.add([ { 'id': id_, 'indexer_configuration_id': tool_id, @@ -181,15 +186,22 @@ ], conflict_update=True) if id_ not in self._sorted_ids: bisect.insort(self._sorted_ids, id_) + return added - def delete(self, entries): + def delete(self, entries: List[Dict]) -> int: + """Delete entries and return the number of entries deleted. + + """ + deleted = 0 for entry in entries: (id_, tool_id) = (entry['id'], entry['indexer_configuration_id']) key = (id_, tool_id) if tool_id in self._tools_per_id[id_]: self._tools_per_id[id_].remove(tool_id) if key in self._data: + deleted += 1 del self._data[key] + return deleted class IndexerStorage: @@ -216,9 +228,12 @@ return self._mimetypes.get_range( start, end, indexer_configuration_id, limit) - def content_mimetype_add(self, mimetypes, conflict_update=False): + def content_mimetype_add( + self, mimetypes: List[Dict], + conflict_update: bool = False) -> Dict: check_id_types(mimetypes) - self._mimetypes.add(mimetypes, conflict_update) + added = self._mimetypes.add(mimetypes, conflict_update) + return {'content_mimetype:add': added} def content_mimetype_get(self, ids): yield from self._mimetypes.get(ids) @@ -229,9 +244,12 @@ def content_language_get(self, ids): yield from self._languages.get(ids) - def content_language_add(self, languages, conflict_update=False): + def content_language_add( + self, languages: List[Dict], + conflict_update: bool = False) -> Dict: check_id_types(languages) - self._languages.add(languages, conflict_update) + added = self._languages.add(languages, conflict_update) + return {'content_language:add': added} def content_ctags_missing(self, ctags): yield from self._content_ctags.missing(ctags) @@ -245,9 +263,11 @@ **item_ctags_item } - def content_ctags_add(self, ctags, conflict_update=False): + def content_ctags_add( + self, ctags: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(ctags) - self._content_ctags.add_merge(ctags, conflict_update, 'ctags') + added = self._content_ctags.add_merge(ctags, conflict_update, 'ctags') + return {'content_ctags:add': added} def content_ctags_search(self, expression, limit=10, last_sha1=None): @@ -279,9 +299,11 @@ for (id_, facts) in res.items(): yield {id_: facts} - def content_fossology_license_add(self, licenses, conflict_update=False): + def content_fossology_license_add( + self, licenses: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(licenses) - self._licenses.add_merge(licenses, conflict_update, 'licenses') + added = self._licenses.add_merge(licenses, conflict_update, 'licenses') + return {'fossology_license_add:add': added} def content_fossology_license_get_range( self, start, end, indexer_configuration_id, limit=1000): @@ -294,9 +316,11 @@ def content_metadata_get(self, ids): yield from self._content_metadata.get(ids) - def content_metadata_add(self, metadata, conflict_update=False): + def content_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(metadata) - self._content_metadata.add(metadata, conflict_update) + added = self._content_metadata.add(metadata, conflict_update) + return {'content_metadata:add': added} def revision_intrinsic_metadata_missing(self, metadata): yield from self._revision_intrinsic_metadata.missing(metadata) @@ -304,22 +328,28 @@ def revision_intrinsic_metadata_get(self, ids): yield from self._revision_intrinsic_metadata.get(ids) - def revision_intrinsic_metadata_add(self, metadata, conflict_update=False): + def revision_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(metadata) - self._revision_intrinsic_metadata.add(metadata, conflict_update) + added = self._revision_intrinsic_metadata.add( + metadata, conflict_update) + return {'revision_intrinsic_metadata:add': added} - def revision_intrinsic_metadata_delete(self, entries): - self._revision_intrinsic_metadata.delete(entries) + def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: + deleted = self._revision_intrinsic_metadata.delete(entries) + return {'revision_intrinsic_metadata:del': deleted} def origin_intrinsic_metadata_get(self, ids): yield from self._origin_intrinsic_metadata.get(ids) - def origin_intrinsic_metadata_add(self, metadata, - conflict_update=False): - self._origin_intrinsic_metadata.add(metadata, conflict_update) + def origin_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + added = self._origin_intrinsic_metadata.add(metadata, conflict_update) + return {'origin_intrinsic_metadata:add': added} - def origin_intrinsic_metadata_delete(self, entries): - self._origin_intrinsic_metadata.delete(entries) + def origin_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: + deleted = self._origin_intrinsic_metadata.delete(entries) + return {'origin_intrinsic_metadata:del': deleted} def origin_intrinsic_metadata_search_fulltext( self, conjunction, limit=100): diff --git a/swh/indexer/storage/interface.py b/swh/indexer/storage/interface.py --- a/swh/indexer/storage/interface.py +++ b/swh/indexer/storage/interface.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Dict, List from swh.core.api import remote_api_endpoint @@ -89,7 +90,8 @@ ... @remote_api_endpoint('content_mimetype/add') - def content_mimetype_add(self, mimetypes, conflict_update=False): + def content_mimetype_add(self, mimetypes: List[Dict], + conflict_update: bool = False) -> Dict: """Add mimetypes not present in storage. Args: @@ -104,6 +106,9 @@ overwrite (``True``) or skip duplicates (``False``, the default) + Returns: + Dict summary of number of rows added + """ ... @@ -161,7 +166,9 @@ ... @remote_api_endpoint('content_language/add') - def content_language_add(self, languages, conflict_update=False): + def content_language_add( + self, languages: List[Dict], + conflict_update: bool = False) -> Dict: """Add languages not present in storage. Args: @@ -174,6 +181,9 @@ overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @@ -216,7 +226,8 @@ ... @remote_api_endpoint('content/ctags/add') - def content_ctags_add(self, ctags, conflict_update=False): + def content_ctags_add(self, ctags: List[Dict], + conflict_update: bool = False) -> Dict: """Add ctags not present in storage Args: @@ -226,6 +237,9 @@ - **ctags** ([list): List of dictionary with keys: name, kind, line, lang + Returns: + Dict summary of number of rows added + """ ... @@ -263,7 +277,8 @@ ... @remote_api_endpoint('content/fossology_license/add') - def content_fossology_license_add(self, licenses, conflict_update=False): + def content_fossology_license_add( + self, licenses: List[Dict], conflict_update: bool = False) -> Dict: """Add licenses not present in storage. Args: @@ -277,7 +292,7 @@ or skip duplicates (false, the default) Returns: - list: content_license entries which failed due to unknown licenses + Dict summary of number of rows added """ ... @@ -343,7 +358,8 @@ ... @remote_api_endpoint('content_metadata/add') - def content_metadata_add(self, metadata, conflict_update=False): + def content_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: """Add metadata not present in storage. Args: @@ -355,6 +371,9 @@ conflict_update: Flag to determine if we want to overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @@ -395,7 +414,8 @@ ... @remote_api_endpoint('revision_intrinsic_metadata/add') - def revision_intrinsic_metadata_add(self, metadata, conflict_update=False): + def revision_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: """Add metadata not present in storage. Args: @@ -410,11 +430,14 @@ conflict_update: Flag to determine if we want to overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @remote_api_endpoint('revision_intrinsic_metadata/delete') - def revision_intrinsic_metadata_delete(self, entries): + def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: """Remove revision metadata from the storage. Args: @@ -423,6 +446,9 @@ - **id** (bytes): revision identifier - **indexer_configuration_id** (int): tool used to compute metadata + + Returns: + Summary of number of rows deleted """ ... @@ -448,8 +474,8 @@ ... @remote_api_endpoint('origin_intrinsic_metadata/add') - def origin_intrinsic_metadata_add(self, metadata, - conflict_update=False): + def origin_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: """Add origin metadata not present in storage. Args: @@ -466,12 +492,15 @@ conflict_update: Flag to determine if we want to overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @remote_api_endpoint('origin_intrinsic_metadata/delete') def origin_intrinsic_metadata_delete( - self, entries): + self, entries: List[Dict]) -> Dict: """Remove origin metadata from the storage. Args: @@ -480,6 +509,9 @@ - **id** (str): origin urls - **indexer_configuration_id** (int): tool used to compute metadata + + Returns: + Summary of number of rows deleted """ ... diff --git a/swh/indexer/storage/metrics.py b/swh/indexer/storage/metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/metrics.py @@ -0,0 +1,79 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = 'swh_indexer_storage_operations_total' +OPERATIONS_UNIT_METRIC = "swh_indexer_storage_operations_{unit}_total" +DURATION_METRIC = "swh_indexer_storage_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric, count, method_name): + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric (str): Metric's name (e.g content:add, content:add:bytes) + count (int): Associated value for the metric + method_name (str): Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) + else: + logging.warning('Skipping unknown metric {%s: %s}' % ( + metric, count)) + return False + + statsd.increment( + metric_name, count, tags={ + 'endpoint': method_name, + 'object_type': object_type, + 'operation': operation, + }) + return True + + +def process_metrics(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for metric, count in r.items(): + send_metric(metric=metric, count=count, method_name=f.__name__) + + return r + + return d diff --git a/swh/indexer/tests/storage/test_metrics.py b/swh/indexer/tests/storage/test_metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/storage/test_metrics.py @@ -0,0 +1,53 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.indexer.storage.metrics import ( + send_metric, OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC +) + + +def test_send_metric_unknown_unit(): + r = send_metric('content', count=10, method_name='content_add') + assert r is False + r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') + assert r is False + + +def test_send_metric_no_value(): + r = send_metric('content_mimetype:add', count=0, + method_name='content_mimetype_add') + assert r is False + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_no_unit(mock_statsd): + r = send_metric('content_mimetype:add', count=10, + method_name='content_mimetype_add') + + mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ + 'endpoint': 'content_mimetype_add', + 'object_type': 'content_mimetype', + 'operation': 'add', + }) + + assert r + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_unit(mock_statsd): + unit_ = 'bytes' + r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') + + expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) + mock_statsd.assert_called_with( + expected_metric, 100, tags={ + 'endpoint': 'c_add', + 'object_type': 'c', + 'operation': 'add', + }) + + assert r diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,8 @@ import inspect import threading +from typing import Dict + import pytest from swh.model.hashutil import hash_to_bytes @@ -31,8 +33,39 @@ return mimetypes -def endpoint(storage, endpoint_type, endpoint_name): - return getattr(storage, endpoint_type + '_' + endpoint_name) +def endpoint_name(etype: str, ename: str) -> str: + """Compute the storage's endpoint's name + + >>> endpoint_name('content_mimetype', 'add') + 'content_mimetype_add' + >>> endpoint_name('content_fosso_license', 'delete') + 'content_fosso_license_delete' + + """ + return f'{etype}_{ename}' + + +def endpoint(storage, etype: str, ename: str): + return getattr(storage, endpoint_name(etype, ename)) + + +def expected_summary( + count: int, etype: str, ename: str = 'add') -> Dict[str, int]: + """Compute the expected summary + + The key is determine according to etype and ename + + >>> expected_summary(10, 'content_mimetype', 'add') + {'content_mimetype:add': 10} + >>> expected_summary(9, 'origin_intrinsic_metadata', 'delete') + {'origin_intrinsic_metadata:del': 9} + + """ + pattern = ename[0:3] + key = endpoint_name(etype, ename).replace(f'_{ename}', f':{pattern}') + return { + key: count + } class StorageETypeTester: @@ -71,12 +104,14 @@ ] # now, when we add one of them - endpoint(storage, etype, 'add')([{ + summary = endpoint(storage, etype, 'add')([{ 'id': data.sha1_2, **self.example_data[0], 'indexer_configuration_id': tool_id, }]) + assert summary == expected_summary(1, etype) + # we expect only the other one returned actual_missing = endpoint(storage, etype, 'missing')(query) assert list(actual_missing) == [data.sha1_1] @@ -92,7 +127,8 @@ **self.example_data[0], 'indexer_configuration_id': tool_id, } - endpoint(storage, etype, 'add')([data_v1]) + summary = endpoint(storage, etype, 'add')([data_v1]) + assert summary == expected_summary(1, etype) # should be able to retrieve it actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -106,7 +142,8 @@ # now if we add a modified version of the same object (same id) data_v2 = data_v1.copy() data_v2.update(self.example_data[1]) - endpoint(storage, etype, 'add')([data_v2]) + summary2 = endpoint(storage, etype, 'add')([data_v2]) + assert summary2 == expected_summary(0, etype) # not added # we expect to retrieve the original data, not the modified one actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -125,7 +162,8 @@ } # given - endpoint(storage, etype, 'add')([data_v1]) + summary = endpoint(storage, etype, 'add')([data_v1]) + assert summary == expected_summary(1, etype) # not added # when actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -144,6 +182,7 @@ data_v2.update(self.example_data[1]) endpoint(storage, etype, 'add')([data_v2], conflict_update=True) + assert summary == expected_summary(1, etype) # modified so counted actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -254,7 +293,8 @@ } # when - endpoint(storage, etype, 'add')([data_rev1]) + summary = endpoint(storage, etype, 'add')([data_rev1]) + assert summary == expected_summary(1, etype) with pytest.raises(DuplicateId): endpoint(storage, etype, 'add')( @@ -285,7 +325,8 @@ } # when - endpoint(storage, etype, 'add')([data1]) + summary = endpoint(storage, etype, 'add')([data1]) + assert summary == expected_summary(1, etype) # then actual_data = list(endpoint(storage, etype, 'get')(query)) @@ -843,13 +884,16 @@ } # when - endpoint(storage, etype, 'add')([data1]) - endpoint(storage, etype, 'delete')([ + summary = endpoint(storage, etype, 'add')([data1]) + assert summary == expected_summary(1, etype) + + summary2 = endpoint(storage, etype, 'delete')([ { 'id': data.sha1_2, 'indexer_configuration_id': tool['id'], } ]) + assert summary2 == expected_summary(1, etype, 'del') # then actual_data = list(endpoint(storage, etype, 'get')(query))