diff --git a/sql/upgrades/131.sql b/sql/upgrades/131.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/131.sql @@ -0,0 +1,223 @@ +-- SWH Indexer DB schema upgrade +-- from_version: 130 +-- to_version: 131 +-- description: _add function returns the inserted rows + +insert into dbversion(version, release, description) + values(131, now(), 'Work In Progress'); + +drop function swh_content_mimetype_add(boolean); +drop function swh_content_language_add(boolean); +drop function swh_content_ctags_add(boolean); +drop function swh_content_fossology_license_add(boolean); +drop function swh_content_metadata_add(boolean); +drop function swh_revision_intrinsic_metadata_add(boolean); +drop function swh_origin_intrinsic_metadata_add(boolean); + +create or replace function swh_content_mimetype_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) + select id, mimetype, encoding, indexer_configuration_id + from tmp_content_mimetype tcm + on conflict(id, indexer_configuration_id) + do update set mimetype = excluded.mimetype, + encoding = excluded.encoding; + else + insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) + select id, mimetype, encoding, indexer_configuration_id + from tmp_content_mimetype tcm + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_mimetype_add(boolean) IS 'Add new content mimetypes'; + +create or replace function swh_content_language_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into content_language (id, lang, indexer_configuration_id) + select id, lang, indexer_configuration_id + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do update set lang = excluded.lang; + else + insert into content_language (id, lang, indexer_configuration_id) + select id, lang, indexer_configuration_id + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_language_add(boolean) IS 'Add new content languages'; + + +create or replace function swh_content_ctags_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + delete from content_ctags + where id in (select tmp.id + from tmp_content_ctags tmp + inner join indexer_configuration i on i.id=tmp.indexer_configuration_id); + end if; + + insert into content_ctags (id, name, kind, line, lang, indexer_configuration_id) + select id, name, kind, line, lang, indexer_configuration_id + from tmp_content_ctags tct + on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) + do nothing; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_ctags_add(boolean) IS 'Add new ctags symbols per content'; + +create or replace function swh_content_fossology_license_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + -- insert unknown licenses first + insert into fossology_license (name) + select distinct license from tmp_content_fossology_license tmp + where not exists (select 1 from fossology_license where name=tmp.license) + on conflict(name) do nothing; + + if conflict_update then + insert into content_fossology_license (id, license_id, indexer_configuration_id) + select tcl.id, + (select id from fossology_license where name = tcl.license) as license, + indexer_configuration_id + from tmp_content_fossology_license tcl + on conflict(id, license_id, indexer_configuration_id) + do update set license_id = excluded.license_id; + end if; + + insert into content_fossology_license (id, license_id, indexer_configuration_id) + select tcl.id, + (select id from fossology_license where name = tcl.license) as license, + indexer_configuration_id + from tmp_content_fossology_license tcl + on conflict(id, license_id, indexer_configuration_id) + do nothing; + + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +create or replace function swh_content_metadata_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into content_metadata (id, metadata, indexer_configuration_id) + select id, metadata, indexer_configuration_id + from tmp_content_metadata tcm + on conflict(id, indexer_configuration_id) + do update set metadata = excluded.metadata; + else + insert into content_metadata (id, metadata, indexer_configuration_id) + select id, metadata, indexer_configuration_id + from tmp_content_metadata tcm + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_content_metadata_add(boolean) IS 'Add new content metadata'; + + +create or replace function swh_revision_intrinsic_metadata_add(conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + if conflict_update then + insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) + select id, metadata, mappings, indexer_configuration_id + from tmp_revision_intrinsic_metadata tcm + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + mappings = excluded.mappings; + else + insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) + select id, metadata, mappings, indexer_configuration_id + from tmp_revision_intrinsic_metadata tcm + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; + +comment on function swh_revision_intrinsic_metadata_add(boolean) IS 'Add new revision intrinsic metadata'; + +create or replace function swh_origin_intrinsic_metadata_add( + conflict_update boolean) + returns bigint + language plpgsql +as $$ +declare + res bigint; +begin + perform swh_origin_intrinsic_metadata_compute_tsvector(); + if conflict_update then + insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) + select id, metadata, indexer_configuration_id, from_revision, + metadata_tsvector, mappings + from tmp_origin_intrinsic_metadata + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + metadata_tsvector = excluded.metadata_tsvector, + mappings = excluded.mappings, + from_revision = excluded.from_revision; + else + insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) + select id, metadata, indexer_configuration_id, from_revision, + metadata_tsvector, mappings + from tmp_origin_intrinsic_metadata + on conflict(id, indexer_configuration_id) + do nothing; + end if; + get diagnostics res = ROW_COUNT; + return res; +end +$$; diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,8 @@ import subprocess import json +from typing import Dict, List + from swh.model import hashutil from .indexer import ContentIndexer, write_to_temp @@ -135,17 +137,18 @@ return ctags - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str) -> Dict: """Persist the results in storage. Args: - results ([dict]): list of content_mimetype, dict with the + results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ - self.idx_storage.content_ctags_add( + return self.idx_storage.content_ctags_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,17 +1,21 @@ -# Copyright (C) 2016-2018 The Software Heritage developers +# Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import subprocess -from typing import Optional +from typing import Any, Dict, List, Optional from swh.model import hashutil from .indexer import ContentIndexer, ContentRangeIndexer, write_to_temp -def compute_license(path, log=None): +logger = logging.getLogger(__name__) + + +def compute_license(path): """Determine license from file at path. Args: @@ -38,10 +42,9 @@ 'path': path, } except subprocess.CalledProcessError: - if log: - from os import path as __path - log.exception('Problem during license detection for sha1 %s' % - __path.basename(path)) + from os import path as __path + logger.exception('Problem during license detection for sha1 %s' % + __path.basename(path)) return { 'licenses': [], 'path': path, @@ -73,7 +76,8 @@ super().prepare() self.working_directory = self.config['workdir'] - def index(self, id, data): + def index(self, id: bytes, data: Optional[bytes] = None, + **kwargs) -> Dict[str, Any]: """Index sha1s' content and store result. Args: @@ -90,33 +94,37 @@ """ assert isinstance(id, bytes) + assert data is not None + tool = self.tool # type: ignore with write_to_temp( filename=hashutil.hash_to_hex(id), # use the id as pathname data=data, working_directory=self.working_directory) as content_path: - properties = compute_license(path=content_path, log=self.log) + properties = compute_license(path=content_path) properties.update({ 'id': id, - 'indexer_configuration_id': self.tool['id'], + 'indexer_configuration_id': tool['id'], }) return properties - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str) -> Dict: """Persist the results in storage. Args: - results ([dict]): list of content_license, dict with the + results: list of content_license dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ - self.idx_storage.content_fossology_license_add( + idx_storage = self.idx_storage # type: ignore + return idx_storage.content_fossology_license_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -10,8 +10,9 @@ import tempfile from contextlib import contextmanager -from typing import Any, Dict, Tuple, Generator, Union, List -from typing import Set +from typing import ( + Any, Dict, Iterator, List, Optional, Set, Tuple, Union +) from swh.scheduler import CONFIG as SWH_CONFIG @@ -26,8 +27,7 @@ @contextmanager def write_to_temp( - filename: str, data: bytes, working_directory: str -) -> Generator[str, None, None]: + filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: @@ -238,9 +238,8 @@ else: return [] - def index( - self, id: bytes, data: bytes - ) -> Dict[str, Any]: + def index(self, id: bytes, data: Optional[bytes] = None, + **kwargs) -> Dict[str, Any]: """Index computation for the id and associated raw data. Args: @@ -255,7 +254,7 @@ """ raise NotImplementedError() - def filter(self, ids: List[bytes]) -> Generator[bytes, None, None]: + def filter(self, ids: List[bytes]) -> Iterator[bytes]: """Filter missing ids for that particular indexer. Args: @@ -268,7 +267,7 @@ yield from ids @abc.abstractmethod - def persist_index_computations(self, results, policy_update): + def persist_index_computations(self, results, policy_update) -> Dict: """Persist the computation resulting from the index. Args: @@ -279,27 +278,10 @@ respectively update duplicates or ignore them Returns: - None + a summary dict of what has been inserted in the storage """ - pass - - @abc.abstractmethod - def run(self, ids, policy_update, **kwargs): - """Given a list of ids: - - - retrieves the data from the storage - - executes the indexing computations - - stores the results (according to policy_update) - - Args: - ids ([bytes]): id's identifier list - policy_update (str): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them - **kwargs: passed to the `index` method - - """ - pass + return {} class ContentIndexer(BaseIndexer): @@ -313,8 +295,8 @@ methods mentioned in the :class:`BaseIndexer` class. """ - - def run(self, ids, policy_update, **kwargs): + def run(self, ids: Union[List[bytes], bytes, str], policy_update: str, + **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage @@ -328,12 +310,17 @@ them **kwargs: passed to the `index` method + Returns: + A summary Dict of the task's status + """ - ids = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ - for id_ in ids] + status = 'uneventful' + sha1s = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ + for id_ in ids] results = [] + summary: Dict = {} try: - for sha1 in ids: + for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: @@ -343,14 +330,18 @@ res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) - - self.persist_index_computations(results, policy_update) + status = 'eventful' + summary = self.persist_index_computations(results, policy_update) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception( 'Problem when reading contents metadata.') + status = 'failed' + finally: + summary['status'] = status + return summary class ContentRangeIndexer(BaseIndexer): @@ -383,7 +374,7 @@ def _list_contents_to_index( self, start: bytes, end: bytes, indexed: Set[bytes] - ) -> Generator[bytes, None, None]: + ) -> Iterator[bytes]: """Compute from storage the new contents to index in the range [start, end]. The already indexed contents are skipped. @@ -411,7 +402,7 @@ def _index_contents( self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any - ) -> Generator[Dict, None, None]: + ) -> Iterator[Dict]: """Index the contents from within range [start, end] Args: @@ -430,7 +421,7 @@ self.log.warning('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue - res = self.index(sha1, raw_content, **kwargs) # type: ignore + res = self.index(sha1, raw_content, **kwargs) if res: if not isinstance(res['id'], bytes): raise TypeError( @@ -439,8 +430,7 @@ yield res def _index_with_skipping_already_done( - self, start: bytes, end: bytes - ) -> Generator[Dict, None, None]: + self, start: bytes, end: bytes) -> Iterator[Dict]: """Index not already indexed contents in range [start, end]. Args: @@ -460,47 +450,55 @@ start, _end, contents) start = indexed_page['next'] - def run(self, start, end, skip_existing=True, **kwargs): + def run(self, start: Union[bytes, str], end: Union[bytes, str], + skip_existing: bool = True, **kwargs) -> Dict: """Given a range of content ids, compute the indexing computations on the contents within. Either the indexer is incremental (filter out existing computed data) or not (compute everything from scratch). Args: - start (Union[bytes, str]): Starting range identifier - end (Union[bytes, str]): Ending range identifier - skip_existing (bool): Skip existing indexed data + start: Starting range identifier + end: Ending range identifier + skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: - bool: True if data was indexed, False otherwise. + A dict with the task's status """ - with_indexed_data = False + status = 'uneventful' + summary: Dict = {} try: - if isinstance(start, str): - start = hashutil.hash_to_bytes(start) - if isinstance(end, str): - end = hashutil.hash_to_bytes(end) + range_start = hashutil.hash_to_bytes(start) \ + if isinstance(start, str) else start + range_end = hashutil.hash_to_bytes(end) \ + if isinstance(end, str) else end if skip_existing: - gen = self._index_with_skipping_already_done(start, end) + gen = self._index_with_skipping_already_done( + range_start, range_end) else: - gen = self._index_contents(start, end, indexed=[]) - - for results in utils.grouper(gen, - n=self.config['write_batch_size']): - self.persist_index_computations( - results, policy_update='update-dups') - with_indexed_data = True + gen = self._index_contents( + range_start, range_end, indexed=set([])) + + for contents in utils.grouper( + gen, n=self.config['write_batch_size']): + res = self.persist_index_computations( + contents, policy_update='update-dups') + summary['content_mimetype:add'] += res.get( + 'content_mimetype:add') + status = 'eventful' except Exception: if not self.catch_exceptions: raise self.log.exception( 'Problem when computing metadata.') + status = 'failed' finally: - return with_indexed_data + summary['status'] = status + return summary class OriginIndexer(BaseIndexer): @@ -513,8 +511,8 @@ class. """ - def run(self, origin_urls, policy_update='update-dups', - next_step=None, **kwargs): + def run(self, origin_urls: List[str], + policy_update: str = 'update-dups', **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage @@ -522,17 +520,16 @@ - store the results (according to policy_update) Args: - origin_urls ([str]): list of origin urls. - policy_update (str): either 'update-dups' or 'ignore-dups' to + origin_urls: list of origin urls. + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates (default) or ignore them - parse_ids (bool): Do we need to parse id or not (default) **kwargs: passed to the `index` method """ results = self.index_list(origin_urls, **kwargs) - - self.persist_index_computations(results, policy_update) + summary = self.persist_index_computations(results, policy_update) self.results = results + return summary def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]: results = [] @@ -560,7 +557,7 @@ class. """ - def run(self, ids, policy_update): + def run(self, ids: Union[str, bytes], policy_update: str) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage @@ -568,15 +565,15 @@ - store the results (according to policy_update) Args: - ids ([bytes or str]): sha1_git's identifier list - policy_update (str): either 'update-dups' or 'ignore-dups' to + ids: sha1_git's identifier list + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] - ids = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ - for id_ in ids] - revs = self.storage.revision_get(ids) + revs = self.storage.revision_get( + hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ + for id_ in ids) for rev in revs: if not rev: @@ -592,5 +589,6 @@ raise self.log.exception( 'Problem when processing revision') - self.persist_index_computations(results, policy_update) + summary = self.persist_index_computations(results, policy_update) self.results = results + return summary diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -5,7 +5,7 @@ from copy import deepcopy -from typing import Any, List, Dict, Tuple, Callable, Generator +from typing import Any, Callable, Dict, Iterator, List, Tuple from swh.core.utils import grouper @@ -26,7 +26,7 @@ def call_with_batches( f: Callable[[List[Dict[str, Any]]], Dict['str', Any]], args: List[Dict[str, str]], batch_size: int -) -> Generator[str, None, None]: +) -> Iterator[str]: """Calls a function with batches of args, and concatenates the results. """ groups = grouper(args, batch_size) @@ -89,7 +89,7 @@ def persist_index_computations( self, results: List[Dict], policy_update: str - ) -> None: + ) -> Dict: """Persist the results in storage. Args: @@ -101,7 +101,7 @@ respectively update duplicates or ignore them """ - self.idx_storage.content_metadata_add( + return self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) @@ -188,7 +188,7 @@ def persist_index_computations( self, results: List[Dict], policy_update: str - ) -> None: + ) -> Dict: """Persist the results in storage. Args: @@ -203,7 +203,7 @@ """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata - self.idx_storage.revision_intrinsic_metadata_add( + return self.idx_storage.revision_intrinsic_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_intrinsic_metadata( @@ -327,7 +327,7 @@ def persist_index_computations( self, results: List[Dict], policy_update: str - ) -> None: + ) -> Dict: conflict_update = (policy_update == 'update-dups') # Deduplicate revisions @@ -335,6 +335,7 @@ orig_metadata: List[Any] = [] revs_to_delete: List[Any] = [] origs_to_delete: List[Any] = [] + summary: Dict = {} for (orig_item, rev_item) in results: assert rev_item['metadata'] == orig_item['metadata'] if not rev_item['metadata'] or \ @@ -352,17 +353,25 @@ orig_metadata.append(orig_item) if rev_metadata: - self.idx_storage.revision_intrinsic_metadata_add( + summary_rev = self.idx_storage.revision_intrinsic_metadata_add( rev_metadata, conflict_update=conflict_update) + summary.update(summary_rev) if orig_metadata: - self.idx_storage.origin_intrinsic_metadata_add( + summary_ori = self.idx_storage.origin_intrinsic_metadata_add( orig_metadata, conflict_update=conflict_update) + summary.update(summary_ori) # revs_to_delete should always be empty unless we changed a mapping # to detect less files or less content. # However, origs_to_delete may be empty whenever an upstream deletes # a metadata file. if origs_to_delete: - self.idx_storage.origin_intrinsic_metadata_delete(origs_to_delete) + summary_ori = self.idx_storage.origin_intrinsic_metadata_delete( + origs_to_delete) + summary.update(summary_ori) if revs_to_delete: - self.idx_storage.revision_intrinsic_metadata_delete(revs_to_delete) + summary_rev = self.idx_storage.revision_intrinsic_metadata_delete( + revs_to_delete) + summary.update(summary_ori) + + return summary diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -55,7 +55,8 @@ CONFIG_BASE_FILENAME = 'indexer/mimetype' # type: Optional[str] - def index(self, id: bytes, data: bytes) -> Dict[str, Any]: + def index(self, id: bytes, data: Optional[bytes] = None, + **kwargs) -> Dict[str, Any]: """Index sha1s' content and store result. Args: @@ -70,6 +71,7 @@ - encoding: encoding in bytes """ + assert data is not None properties = compute_mimetype_encoding(data) properties.update({ 'id': id, @@ -78,8 +80,8 @@ return properties def persist_index_computations( - self, results: List[Dict], policy_update: List[str] - ) -> None: + self, results: List[Dict], policy_update: str + ) -> Dict: """Persist the results in storage. Args: @@ -90,7 +92,7 @@ respectively update duplicates or ignore them """ - self.idx_storage.content_mimetype_add( + return self.idx_storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -24,10 +24,10 @@ def persist_index_computations( self, results: Any, policy_update: str - ) -> None: + ) -> Dict: """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer.""" - pass + return {} # Dispatch diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -145,19 +145,25 @@ content.update(content_hashes) yield content, checksums_to_compute - def run(self, contents: List[Dict[str, Any]]) -> None: + def run(self, contents: List[Dict[str, Any]]) -> Dict: """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata - Args: - contents: contents as dictionary with necessary keys. - key present in such dictionary should be the ones defined in - the 'primary_key' option. + Args: + contents: contents as dictionary with necessary keys. + key present in such dictionary should be the ones defined in + the 'primary_key' option. + + Returns: + A summary dict with key 'status', task' status and 'count' the + number of updated contents. """ + status = 'uneventful' + count = 0 for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update): @@ -172,6 +178,13 @@ try: self.storage.content_update(contents, keys=keys) + count += len(contents) + status = 'eventful' except Exception: self.log.exception('Problem during update.') continue + + return { + 'status': status, + 'count': count, + } diff --git a/swh/indexer/sql/30-swh-schema.sql b/swh/indexer/sql/30-swh-schema.sql --- a/swh/indexer/sql/30-swh-schema.sql +++ b/swh/indexer/sql/30-swh-schema.sql @@ -14,7 +14,7 @@ ); insert into dbversion(version, release, description) - values(130, now(), 'Work In Progress'); + values(131, now(), 'Work In Progress'); -- Computing metadata on sha1's contents -- a SHA1 checksum (not necessarily originating from Git) diff --git a/swh/indexer/sql/40-swh-func.sql b/swh/indexer/sql/40-swh-func.sql --- a/swh/indexer/sql/40-swh-func.sql +++ b/swh/indexer/sql/40-swh-func.sql @@ -47,29 +47,31 @@ -- swh_content_mimetype_missing must take place before calling this -- function. -- --- -- operates in bulk: 0. swh_mktemp(content_mimetype), 1. COPY to tmp_content_mimetype, -- 2. call this function create or replace function swh_content_mimetype_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) select id, mimetype, encoding, indexer_configuration_id from tmp_content_mimetype tcm - on conflict(id, indexer_configuration_id) - do update set mimetype = excluded.mimetype, - encoding = excluded.encoding; - + on conflict(id, indexer_configuration_id) + do update set mimetype = excluded.mimetype, + encoding = excluded.encoding; else insert into content_mimetype (id, mimetype, encoding, indexer_configuration_id) select id, mimetype, encoding, indexer_configuration_id from tmp_content_mimetype tcm - on conflict(id, indexer_configuration_id) do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -85,25 +87,27 @@ -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_content_language, 2. call this function create or replace function swh_content_language_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into content_language (id, lang, indexer_configuration_id) select id, lang, indexer_configuration_id - from tmp_content_language tcl - on conflict(id, indexer_configuration_id) - do update set lang = excluded.lang; - + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do update set lang = excluded.lang; else insert into content_language (id, lang, indexer_configuration_id) select id, lang, indexer_configuration_id - from tmp_content_language tcl - on conflict(id, indexer_configuration_id) - do nothing; + from tmp_content_language tcl + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -141,9 +145,11 @@ -- operates in bulk: 0. swh_mktemp(content_ctags), 1. COPY to tmp_content_ctags, -- 2. call this function create or replace function swh_content_ctags_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then delete from content_ctags @@ -155,9 +161,10 @@ insert into content_ctags (id, name, kind, line, lang, indexer_configuration_id) select id, name, kind, line, lang, indexer_configuration_id from tmp_content_ctags tct - on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) - do nothing; - return; + on conflict(id, hash_sha1(name), kind, line, lang, indexer_configuration_id) + do nothing; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -217,9 +224,11 @@ -- operates in bulk: 0. swh_mktemp(content_fossology_license), 1. COPY to -- tmp_content_fossology_license, 2. call this function create or replace function swh_content_fossology_license_add(conflict_update boolean) - returns void - language plpgsql + returns bigint + language plpgsql as $$ +declare + res bigint; begin -- insert unknown licenses first insert into fossology_license (name) @@ -233,9 +242,8 @@ (select id from fossology_license where name = tcl.license) as license, indexer_configuration_id from tmp_content_fossology_license tcl - on conflict(id, license_id, indexer_configuration_id) - do update set license_id = excluded.license_id; - return; + on conflict(id, license_id, indexer_configuration_id) + do update set license_id = excluded.license_id; end if; insert into content_fossology_license (id, license_id, indexer_configuration_id) @@ -243,14 +251,17 @@ (select id from fossology_license where name = tcl.license) as license, indexer_configuration_id from tmp_content_fossology_license tcl - on conflict(id, license_id, indexer_configuration_id) - do nothing; - return; + on conflict(id, license_id, indexer_configuration_id) + do nothing; + + get diagnostics res = ROW_COUNT; + return res; end $$; comment on function swh_content_fossology_license_add(boolean) IS 'Add new content licenses'; + -- content_metadata functions -- add tmp_content_metadata entries to content_metadata, overwriting @@ -263,25 +274,27 @@ -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_content_metadata, 2. call this function create or replace function swh_content_metadata_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into content_metadata (id, metadata, indexer_configuration_id) select id, metadata, indexer_configuration_id - from tmp_content_metadata tcm - on conflict(id, indexer_configuration_id) - do update set metadata = excluded.metadata; - + from tmp_content_metadata tcm + on conflict(id, indexer_configuration_id) + do update set metadata = excluded.metadata; else insert into content_metadata (id, metadata, indexer_configuration_id) select id, metadata, indexer_configuration_id from tmp_content_metadata tcm - on conflict(id, indexer_configuration_id) - do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -312,27 +325,29 @@ -- operates in bulk: 0. swh_mktemp(content_language), 1. COPY to -- tmp_revision_intrinsic_metadata, 2. call this function create or replace function swh_revision_intrinsic_metadata_add(conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin if conflict_update then insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) select id, metadata, mappings, indexer_configuration_id - from tmp_revision_intrinsic_metadata tcm - on conflict(id, indexer_configuration_id) - do update set - metadata = excluded.metadata, - mappings = excluded.mappings; - + from tmp_revision_intrinsic_metadata tcm + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + mappings = excluded.mappings; else insert into revision_intrinsic_metadata (id, metadata, mappings, indexer_configuration_id) select id, metadata, mappings, indexer_configuration_id from tmp_revision_intrinsic_metadata tcm - on conflict(id, indexer_configuration_id) - do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; @@ -408,32 +423,34 @@ -- tmp_origin_intrinsic_metadata, 2. call this function create or replace function swh_origin_intrinsic_metadata_add( conflict_update boolean) - returns void + returns bigint language plpgsql as $$ +declare + res bigint; begin perform swh_origin_intrinsic_metadata_compute_tsvector(); if conflict_update then insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) select id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings - from tmp_origin_intrinsic_metadata - on conflict(id, indexer_configuration_id) - do update set - metadata = excluded.metadata, - metadata_tsvector = excluded.metadata_tsvector, - mappings = excluded.mappings, - from_revision = excluded.from_revision; - + from tmp_origin_intrinsic_metadata + on conflict(id, indexer_configuration_id) + do update set + metadata = excluded.metadata, + metadata_tsvector = excluded.metadata_tsvector, + mappings = excluded.mappings, + from_revision = excluded.from_revision; else insert into origin_intrinsic_metadata (id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings) select id, metadata, indexer_configuration_id, from_revision, metadata_tsvector, mappings from tmp_origin_intrinsic_metadata - on conflict(id, indexer_configuration_id) - do nothing; + on conflict(id, indexer_configuration_id) + do nothing; end if; - return; + get diagnostics res = ROW_COUNT; + return res; end $$; diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -9,9 +9,11 @@ import psycopg2.pool from collections import defaultdict, Counter +from typing import Dict, List from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError +from swh.storage.metrics import send_metric, timed, process_metrics from . import converters from .db import Db @@ -165,16 +167,31 @@ indexer_configuration_id, limit=limit, db=db, cur=cur) + @timed + @process_metrics @db_transaction() - def content_mimetype_add(self, mimetypes, conflict_update=False, db=None, - cur=None): + def content_mimetype_add( + self, mimetypes: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: + """Add mimetypes to the storage (if conflict_update is True, this will + override existing data if any). + + Returns: + A dict with the number of new elements added to the storage. + + """ check_id_duplicates(mimetypes) mimetypes.sort(key=lambda m: m['id']) db.mktemp_content_mimetype(cur) db.copy_to(mimetypes, 'tmp_content_mimetype', ['id', 'mimetype', 'encoding', 'indexer_configuration_id'], cur) - db.content_mimetype_add_from_temp(conflict_update, cur) + count = db.content_mimetype_add_from_temp(conflict_update, cur) + send_metric('content_mimetype:add', + count=count, method_name='content_mimetype_add') + return { + 'content_mimetype:add': count + } @db_transaction_generator() def content_mimetype_get(self, ids, db=None, cur=None): @@ -193,9 +210,12 @@ yield converters.db_to_language( dict(zip(db.content_language_cols, c))) + @timed + @process_metrics @db_transaction() - def content_language_add(self, languages, conflict_update=False, db=None, - cur=None): + def content_language_add( + self, languages: List[Dict], + conflict_update: bool = False, db=None, cur=None) -> Dict: check_id_duplicates(languages) languages.sort(key=lambda m: m['id']) db.mktemp_content_language(cur) @@ -209,7 +229,12 @@ 'tmp_content_language', ['id', 'lang', 'indexer_configuration_id'], cur) - db.content_language_add_from_temp(conflict_update, cur) + count = db.content_language_add_from_temp(conflict_update, cur) + send_metric('content_language:add', + count=count, method_name='content_language_add') + return { + 'content_language:add': count + } @db_transaction_generator() def content_ctags_missing(self, ctags, db=None, cur=None): @@ -221,9 +246,12 @@ for c in db.content_ctags_get_from_list(ids, cur): yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) + @timed + @process_metrics @db_transaction() - def content_ctags_add(self, ctags, conflict_update=False, db=None, - cur=None): + def content_ctags_add( + self, ctags: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(ctags) ctags.sort(key=lambda m: m['id']) @@ -241,7 +269,12 @@ 'lang', 'indexer_configuration_id'], cur=cur) - db.content_ctags_add_from_temp(conflict_update, cur) + count = db.content_ctags_add_from_temp(conflict_update, cur) + send_metric('content_ctags:add', + count=count, method_name='content_ctags_add') + return { + 'content_ctags:add': count + } @db_transaction_generator() def content_ctags_search(self, expression, @@ -262,9 +295,12 @@ for id_, facts in d.items(): yield {id_: facts} + @timed + @process_metrics @db_transaction() - def content_fossology_license_add(self, licenses, conflict_update=False, - db=None, cur=None): + def content_fossology_license_add( + self, licenses: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(licenses) licenses.sort(key=lambda m: m['id']) db.mktemp_content_fossology_license(cur) @@ -278,7 +314,13 @@ tblname='tmp_content_fossology_license', columns=['id', 'license', 'indexer_configuration_id'], cur=cur) - db.content_fossology_license_add_from_temp(conflict_update, cur) + count = db.content_fossology_license_add_from_temp( + conflict_update, cur) + send_metric('content_fossology_license:add', + count=count, method_name='content_fossology_license_add') + return { + 'content_fossology_license:add': count + } @db_transaction() def content_fossology_license_get_range( @@ -299,9 +341,12 @@ yield converters.db_to_metadata( dict(zip(db.content_metadata_cols, c))) + @timed + @process_metrics @db_transaction() - def content_metadata_add(self, metadata, conflict_update=False, db=None, - cur=None): + def content_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -310,7 +355,12 @@ db.copy_to(metadata, 'tmp_content_metadata', ['id', 'metadata', 'indexer_configuration_id'], cur) - db.content_metadata_add_from_temp(conflict_update, cur) + count = db.content_metadata_add_from_temp(conflict_update, cur) + send_metric('content_metadata:add', + count=count, method_name='content_metadata_add') + return { + 'content_metadata:add': count, + } @db_transaction_generator() def revision_intrinsic_metadata_missing(self, metadata, db=None, cur=None): @@ -324,9 +374,12 @@ yield converters.db_to_metadata( dict(zip(db.revision_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() - def revision_intrinsic_metadata_add(self, metadata, conflict_update=False, - db=None, cur=None): + def revision_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -336,11 +389,23 @@ ['id', 'metadata', 'mappings', 'indexer_configuration_id'], cur) - db.revision_intrinsic_metadata_add_from_temp(conflict_update, cur) + count = db.revision_intrinsic_metadata_add_from_temp( + conflict_update, cur) + send_metric('revision_intrinsic_metadata:add', + count=count, method_name='revision_intrinsic_metadata_add') + return { + 'revision_intrinsic_metadata:add': count, + } + @timed + @process_metrics @db_transaction() - def revision_intrinsic_metadata_delete(self, entries, db=None, cur=None): - db.revision_intrinsic_metadata_delete(entries, cur) + def revision_intrinsic_metadata_delete( + self, entries: List[Dict], db=None, cur=None) -> Dict: + count = db.revision_intrinsic_metadata_delete(entries, cur) + return { + 'revision_intrinsic_metadata:del': count + } @db_transaction_generator() def origin_intrinsic_metadata_get(self, ids, db=None, cur=None): @@ -348,10 +413,12 @@ yield converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() - def origin_intrinsic_metadata_add(self, metadata, - conflict_update=False, db=None, - cur=None): + def origin_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -362,12 +429,23 @@ 'indexer_configuration_id', 'from_revision', 'mappings'], cur) - db.origin_intrinsic_metadata_add_from_temp(conflict_update, cur) + count = db.origin_intrinsic_metadata_add_from_temp( + conflict_update, cur) + send_metric('content_origin_intrinsic:add', + count=count, method_name='content_origin_intrinsic_add') + return { + 'origin_intrinsic_metadata:add': count, + } + @timed + @process_metrics @db_transaction() def origin_intrinsic_metadata_delete( - self, entries, db=None, cur=None): - db.origin_intrinsic_metadata_delete(entries, cur) + self, entries: List[Dict], db=None, cur=None) -> Dict: + count = db.origin_intrinsic_metadata_delete(entries, cur) + return { + 'origin_intrinsic_metadata:del': count, + } @db_transaction_generator() def origin_intrinsic_metadata_search_fulltext( diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py --- a/swh/indexer/storage/db.py +++ b/swh/indexer/storage/db.py @@ -60,8 +60,10 @@ def mktemp_content_mimetype(self, cur=None): pass def content_mimetype_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_mimetype_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute('select * from swh_content_mimetype_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def _convert_key(self, key, main_table='c'): """Convert keys according to specific use in the module. @@ -176,8 +178,10 @@ def mktemp_content_language(self, cur=None): pass def content_language_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_language_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute('select * from swh_content_language_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def content_language_get_from_list(self, ids, cur=None): yield from self._get_from_list( @@ -201,8 +205,11 @@ def mktemp_content_ctags(self, cur=None): pass def content_ctags_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_ctags_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute( + 'select * from swh_content_ctags_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def content_ctags_get_from_list(self, ids, cur=None): cur = self._cursor(cur) @@ -252,9 +259,11 @@ """Add new licenses per content. """ - self._cursor(cur).execute( - "SELECT swh_content_fossology_license_add(%s)", + cur = self._cursor(cur) + cur.execute( + 'select * from swh_content_fossology_license_add(%s)', (conflict_update, )) + return cur.fetchone()[0] def content_fossology_license_get_from_list(self, ids, cur=None): """Retrieve licenses per id. @@ -293,8 +302,11 @@ def mktemp_content_metadata(self, cur=None): pass def content_metadata_add_from_temp(self, conflict_update, cur=None): - self._cursor(cur).execute("SELECT swh_content_metadata_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute( + 'select * from swh_content_metadata_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def content_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( @@ -321,9 +333,11 @@ def revision_intrinsic_metadata_add_from_temp( self, conflict_update, cur=None): - self._cursor(cur).execute( - "SELECT swh_revision_intrinsic_metadata_add(%s)", - (conflict_update, )) + cur = self._cursor(cur) + cur.execute( + 'select * from swh_revision_intrinsic_metadata_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def revision_intrinsic_metadata_delete( self, entries, cur=None): @@ -331,9 +345,11 @@ cur.execute( "DELETE from revision_intrinsic_metadata " "WHERE (id, indexer_configuration_id) IN " - " (VALUES %s)" % (', '.join('%s' for _ in entries)), + " (VALUES %s) " + "RETURNING id" % (', '.join('%s' for _ in entries)), tuple((e['id'], e['indexer_configuration_id']) for e in entries),) + return len(cur.fetchall()) def revision_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( @@ -358,8 +374,9 @@ self, conflict_update, cur=None): cur = self._cursor(cur) cur.execute( - "SELECT swh_origin_intrinsic_metadata_add(%s)", - (conflict_update, )) + 'select * from swh_origin_intrinsic_metadata_add(%s)', + (conflict_update, )) + return cur.fetchone()[0] def origin_intrinsic_metadata_delete( self, entries, cur=None): @@ -367,9 +384,11 @@ cur.execute( "DELETE from origin_intrinsic_metadata " "WHERE (id, indexer_configuration_id) IN" - " (VALUES %s)" % (', '.join('%s' for _ in entries)), + " (VALUES %s) " + "RETURNING id" % (', '.join('%s' for _ in entries)), tuple((e['id'], e['indexer_configuration_id']) for e in entries),) + return len(cur.fetchall()) def origin_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -123,7 +123,7 @@ 'next': None, } - def add(self, data, conflict_update): + def add(self, data: List[Dict], conflict_update: bool) -> int: """Add data not present in storage. Args: @@ -140,22 +140,27 @@ """ data = list(data) check_id_duplicates(data) + count = 0 for item in data: item = item.copy() tool_id = item.pop('indexer_configuration_id') id_ = item.pop('id') - data = item + data_item = item if not conflict_update and \ tool_id in self._tools_per_id.get(id_, set()): # Duplicate, should not be updated continue key = (id_, tool_id) - self._data[key] = data + self._data[key] = data_item self._tools_per_id[id_].add(tool_id) + count += 1 if id_ not in self._sorted_ids: bisect.insort(self._sorted_ids, id_) + return count - def add_merge(self, new_data, conflict_update, merged_key): + def add_merge(self, new_data: List[Dict], conflict_update: bool, + merged_key: str) -> int: + added = 0 for new_item in new_data: id_ = new_item['id'] tool_id = new_item['indexer_configuration_id'] @@ -172,7 +177,7 @@ for new_subitem in new_item[merged_key]: if new_subitem not in all_subitems: all_subitems.append(new_subitem) - self.add([ + added += self.add([ { 'id': id_, 'indexer_configuration_id': tool_id, @@ -181,15 +186,22 @@ ], conflict_update=True) if id_ not in self._sorted_ids: bisect.insort(self._sorted_ids, id_) + return added - def delete(self, entries): + def delete(self, entries: List[Dict]) -> int: + """Delete entries and return the number of entries deleted. + + """ + deleted = 0 for entry in entries: (id_, tool_id) = (entry['id'], entry['indexer_configuration_id']) key = (id_, tool_id) if tool_id in self._tools_per_id[id_]: self._tools_per_id[id_].remove(tool_id) if key in self._data: + deleted += 1 del self._data[key] + return deleted class IndexerStorage: @@ -216,9 +228,12 @@ return self._mimetypes.get_range( start, end, indexer_configuration_id, limit) - def content_mimetype_add(self, mimetypes, conflict_update=False): + def content_mimetype_add( + self, mimetypes: List[Dict], + conflict_update: bool = False) -> Dict: check_id_types(mimetypes) - self._mimetypes.add(mimetypes, conflict_update) + added = self._mimetypes.add(mimetypes, conflict_update) + return {'content_mimetype:add': added} def content_mimetype_get(self, ids): yield from self._mimetypes.get(ids) @@ -229,9 +244,12 @@ def content_language_get(self, ids): yield from self._languages.get(ids) - def content_language_add(self, languages, conflict_update=False): + def content_language_add( + self, languages: List[Dict], + conflict_update: bool = False) -> Dict: check_id_types(languages) - self._languages.add(languages, conflict_update) + added = self._languages.add(languages, conflict_update) + return {'content_language:add': added} def content_ctags_missing(self, ctags): yield from self._content_ctags.missing(ctags) @@ -245,9 +263,11 @@ **item_ctags_item } - def content_ctags_add(self, ctags, conflict_update=False): + def content_ctags_add( + self, ctags: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(ctags) - self._content_ctags.add_merge(ctags, conflict_update, 'ctags') + added = self._content_ctags.add_merge(ctags, conflict_update, 'ctags') + return {'content_ctags:add': added} def content_ctags_search(self, expression, limit=10, last_sha1=None): @@ -279,9 +299,11 @@ for (id_, facts) in res.items(): yield {id_: facts} - def content_fossology_license_add(self, licenses, conflict_update=False): + def content_fossology_license_add( + self, licenses: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(licenses) - self._licenses.add_merge(licenses, conflict_update, 'licenses') + added = self._licenses.add_merge(licenses, conflict_update, 'licenses') + return {'fossology_license_add:add': added} def content_fossology_license_get_range( self, start, end, indexer_configuration_id, limit=1000): @@ -294,9 +316,11 @@ def content_metadata_get(self, ids): yield from self._content_metadata.get(ids) - def content_metadata_add(self, metadata, conflict_update=False): + def content_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(metadata) - self._content_metadata.add(metadata, conflict_update) + added = self._content_metadata.add(metadata, conflict_update) + return {'content_metadata:add': added} def revision_intrinsic_metadata_missing(self, metadata): yield from self._revision_intrinsic_metadata.missing(metadata) @@ -304,22 +328,28 @@ def revision_intrinsic_metadata_get(self, ids): yield from self._revision_intrinsic_metadata.get(ids) - def revision_intrinsic_metadata_add(self, metadata, conflict_update=False): + def revision_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: check_id_types(metadata) - self._revision_intrinsic_metadata.add(metadata, conflict_update) + added = self._revision_intrinsic_metadata.add( + metadata, conflict_update) + return {'revision_intrinsic_metadata:add': added} - def revision_intrinsic_metadata_delete(self, entries): - self._revision_intrinsic_metadata.delete(entries) + def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: + deleted = self._revision_intrinsic_metadata.delete(entries) + return {'revision_intrinsic_metadata:del': deleted} def origin_intrinsic_metadata_get(self, ids): yield from self._origin_intrinsic_metadata.get(ids) - def origin_intrinsic_metadata_add(self, metadata, - conflict_update=False): - self._origin_intrinsic_metadata.add(metadata, conflict_update) + def origin_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + added = self._origin_intrinsic_metadata.add(metadata, conflict_update) + return {'origin_intrinsic_metadata:add': added} - def origin_intrinsic_metadata_delete(self, entries): - self._origin_intrinsic_metadata.delete(entries) + def origin_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: + deleted = self._origin_intrinsic_metadata.delete(entries) + return {'origin_intrinsic_metadata:del': deleted} def origin_intrinsic_metadata_search_fulltext( self, conjunction, limit=100): diff --git a/swh/indexer/storage/interface.py b/swh/indexer/storage/interface.py --- a/swh/indexer/storage/interface.py +++ b/swh/indexer/storage/interface.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Dict, List from swh.core.api import remote_api_endpoint @@ -89,7 +90,8 @@ ... @remote_api_endpoint('content_mimetype/add') - def content_mimetype_add(self, mimetypes, conflict_update=False): + def content_mimetype_add(self, mimetypes: List[Dict], + conflict_update: bool = False) -> Dict: """Add mimetypes not present in storage. Args: @@ -104,6 +106,9 @@ overwrite (``True``) or skip duplicates (``False``, the default) + Returns: + Dict summary of number of rows added + """ ... @@ -161,7 +166,9 @@ ... @remote_api_endpoint('content_language/add') - def content_language_add(self, languages, conflict_update=False): + def content_language_add( + self, languages: List[Dict], + conflict_update: bool = False) -> Dict: """Add languages not present in storage. Args: @@ -174,6 +181,9 @@ overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @@ -216,7 +226,8 @@ ... @remote_api_endpoint('content/ctags/add') - def content_ctags_add(self, ctags, conflict_update=False): + def content_ctags_add(self, ctags: List[Dict], + conflict_update: bool = False) -> Dict: """Add ctags not present in storage Args: @@ -226,6 +237,9 @@ - **ctags** ([list): List of dictionary with keys: name, kind, line, lang + Returns: + Dict summary of number of rows added + """ ... @@ -263,7 +277,8 @@ ... @remote_api_endpoint('content/fossology_license/add') - def content_fossology_license_add(self, licenses, conflict_update=False): + def content_fossology_license_add( + self, licenses: List[Dict], conflict_update: bool = False) -> Dict: """Add licenses not present in storage. Args: @@ -277,7 +292,7 @@ or skip duplicates (false, the default) Returns: - list: content_license entries which failed due to unknown licenses + Dict summary of number of rows added """ ... @@ -343,7 +358,8 @@ ... @remote_api_endpoint('content_metadata/add') - def content_metadata_add(self, metadata, conflict_update=False): + def content_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: """Add metadata not present in storage. Args: @@ -355,6 +371,9 @@ conflict_update: Flag to determine if we want to overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @@ -395,7 +414,8 @@ ... @remote_api_endpoint('revision_intrinsic_metadata/add') - def revision_intrinsic_metadata_add(self, metadata, conflict_update=False): + def revision_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: """Add metadata not present in storage. Args: @@ -410,11 +430,14 @@ conflict_update: Flag to determine if we want to overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @remote_api_endpoint('revision_intrinsic_metadata/delete') - def revision_intrinsic_metadata_delete(self, entries): + def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: """Remove revision metadata from the storage. Args: @@ -423,6 +446,9 @@ - **id** (bytes): revision identifier - **indexer_configuration_id** (int): tool used to compute metadata + + Returns: + Summary of number of rows deleted """ ... @@ -448,8 +474,8 @@ ... @remote_api_endpoint('origin_intrinsic_metadata/add') - def origin_intrinsic_metadata_add(self, metadata, - conflict_update=False): + def origin_intrinsic_metadata_add( + self, metadata: List[Dict], conflict_update: bool = False) -> Dict: """Add origin metadata not present in storage. Args: @@ -466,12 +492,15 @@ conflict_update: Flag to determine if we want to overwrite (true) or skip duplicates (false, the default) + Returns: + Dict summary of number of rows added + """ ... @remote_api_endpoint('origin_intrinsic_metadata/delete') def origin_intrinsic_metadata_delete( - self, entries): + self, entries: List[Dict]) -> Dict: """Remove origin metadata from the storage. Args: @@ -480,6 +509,9 @@ - **id** (str): origin urls - **indexer_configuration_id** (int): tool used to compute metadata + + Returns: + Summary of number of rows deleted """ ... diff --git a/swh/indexer/storage/metrics.py b/swh/indexer/storage/metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/metrics.py @@ -0,0 +1,79 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = 'swh_indexer_storage_operations_total' +OPERATIONS_UNIT_METRIC = "swh_indexer_storage_operations_{unit}_total" +DURATION_METRIC = "swh_indexer_storage_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric, count, method_name): + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric (str): Metric's name (e.g content:add, content:add:bytes) + count (int): Associated value for the metric + method_name (str): Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) + else: + logging.warning('Skipping unknown metric {%s: %s}' % ( + metric, count)) + return False + + statsd.increment( + metric_name, count, tags={ + 'endpoint': method_name, + 'object_type': object_type, + 'operation': operation, + }) + return True + + +def process_metrics(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for metric, count in r.items(): + send_metric(metric=metric, count=count, method_name=f.__name__) + + return r + + return d diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 The Software Heritage developers +# Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -17,41 +17,34 @@ @app.task(name=__name__ + '.OriginMetadata') def origin_metadata(*args, **kwargs): - results = OriginMetadataIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) + return OriginMetadataIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.Ctags') def ctags(*args, **kwargs): - results = CtagsIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) + return CtagsIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.ContentFossologyLicense') def fossology_license(*args, **kwargs): - results = FossologyLicenseIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) + return FossologyLicenseIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.RecomputeChecksums') def recompute_checksums(*args, **kwargs): - results = RecomputeChecksums().run(*args, **kwargs) - return getattr(results, 'results', results) + return RecomputeChecksums().run(*args, **kwargs) @app.task(name=__name__ + '.ContentMimetype') def mimetype(*args, **kwargs): - results = MimetypeIndexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} + return MimetypeIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.ContentRangeMimetype') def range_mimetype(*args, **kwargs): - results = MimetypeRangeIndexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} + return MimetypeRangeIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.ContentRangeFossologyLicense') def range_license(*args, **kwargs): - results = FossologyLicenseRangeIndexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} + return FossologyLicenseRangeIndexer().run(*args, **kwargs) diff --git a/swh/indexer/tests/storage/test_metrics.py b/swh/indexer/tests/storage/test_metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/storage/test_metrics.py @@ -0,0 +1,53 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.indexer.storage.metrics import ( + send_metric, OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC +) + + +def test_send_metric_unknown_unit(): + r = send_metric('content', count=10, method_name='content_add') + assert r is False + r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') + assert r is False + + +def test_send_metric_no_value(): + r = send_metric('content_mimetype:add', count=0, + method_name='content_mimetype_add') + assert r is False + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_no_unit(mock_statsd): + r = send_metric('content_mimetype:add', count=10, + method_name='content_mimetype_add') + + mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ + 'endpoint': 'content_mimetype_add', + 'object_type': 'content_mimetype', + 'operation': 'add', + }) + + assert r + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_unit(mock_statsd): + unit_ = 'bytes' + r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') + + expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) + mock_statsd.assert_called_with( + expected_metric, 100, tags={ + 'endpoint': 'c_add', + 'object_type': 'c', + 'operation': 'add', + }) + + assert r diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,8 @@ import inspect import threading +from typing import Dict + import pytest from swh.model.hashutil import hash_to_bytes @@ -31,8 +33,39 @@ return mimetypes -def endpoint(storage, endpoint_type, endpoint_name): - return getattr(storage, endpoint_type + '_' + endpoint_name) +def endpoint_name(etype: str, ename: str) -> str: + """Compute the storage's endpoint's name + + >>> endpoint_name('content_mimetype', 'add') + 'content_mimetype_add' + >>> endpoint_name('content_fosso_license', 'delete') + 'content_fosso_license_delete' + + """ + return f'{etype}_{ename}' + + +def endpoint(storage, etype: str, ename: str): + return getattr(storage, endpoint_name(etype, ename)) + + +def expected_summary( + count: int, etype: str, ename: str = 'add') -> Dict[str, int]: + """Compute the expected summary + + The key is determine according to etype and ename + + >>> expected_summary(10, 'content_mimetype', 'add') + {'content_mimetype:add': 10} + >>> expected_summary(9, 'origin_intrinsic_metadata', 'delete') + {'origin_intrinsic_metadata:del': 9} + + """ + pattern = ename[0:3] + key = endpoint_name(etype, ename).replace(f'_{ename}', f':{pattern}') + return { + key: count + } class StorageETypeTester: @@ -71,12 +104,14 @@ ] # now, when we add one of them - endpoint(storage, etype, 'add')([{ + summary = endpoint(storage, etype, 'add')([{ 'id': data.sha1_2, **self.example_data[0], 'indexer_configuration_id': tool_id, }]) + assert summary == expected_summary(1, etype) + # we expect only the other one returned actual_missing = endpoint(storage, etype, 'missing')(query) assert list(actual_missing) == [data.sha1_1] @@ -92,7 +127,8 @@ **self.example_data[0], 'indexer_configuration_id': tool_id, } - endpoint(storage, etype, 'add')([data_v1]) + summary = endpoint(storage, etype, 'add')([data_v1]) + assert summary == expected_summary(1, etype) # should be able to retrieve it actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -106,7 +142,8 @@ # now if we add a modified version of the same object (same id) data_v2 = data_v1.copy() data_v2.update(self.example_data[1]) - endpoint(storage, etype, 'add')([data_v2]) + summary2 = endpoint(storage, etype, 'add')([data_v2]) + assert summary2 == expected_summary(0, etype) # not added # we expect to retrieve the original data, not the modified one actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -125,7 +162,8 @@ } # given - endpoint(storage, etype, 'add')([data_v1]) + summary = endpoint(storage, etype, 'add')([data_v1]) + assert summary == expected_summary(1, etype) # not added # when actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -144,6 +182,7 @@ data_v2.update(self.example_data[1]) endpoint(storage, etype, 'add')([data_v2], conflict_update=True) + assert summary == expected_summary(1, etype) # modified so counted actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2])) @@ -254,7 +293,8 @@ } # when - endpoint(storage, etype, 'add')([data_rev1]) + summary = endpoint(storage, etype, 'add')([data_rev1]) + assert summary == expected_summary(1, etype) with pytest.raises(DuplicateId): endpoint(storage, etype, 'add')( @@ -285,7 +325,8 @@ } # when - endpoint(storage, etype, 'add')([data1]) + summary = endpoint(storage, etype, 'add')([data1]) + assert summary == expected_summary(1, etype) # then actual_data = list(endpoint(storage, etype, 'get')(query)) @@ -843,13 +884,16 @@ } # when - endpoint(storage, etype, 'add')([data1]) - endpoint(storage, etype, 'delete')([ + summary = endpoint(storage, etype, 'add')([data1]) + assert summary == expected_summary(1, etype) + + summary2 = endpoint(storage, etype, 'delete')([ { 'id': data.sha1_2, 'indexer_configuration_id': tool['id'], } ]) + assert summary2 == expected_summary(1, etype, 'del') # then actual_data = list(endpoint(storage, etype, 'get')(query)) diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py --- a/swh/indexer/tests/test_fossology_license.py +++ b/swh/indexer/tests/test_fossology_license.py @@ -36,7 +36,7 @@ ['GPL', 'AGPL'])]: mock_subprocess.check_output.return_value = intermediary_result - actual_result = compute_license(path, log=None) + actual_result = compute_license(path) self.assertEqual(actual_result, { 'licenses': output, @@ -44,7 +44,7 @@ }) -def mock_compute_license(path, log=None): +def mock_compute_license(path): """path is the content identifier """ diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -746,12 +746,12 @@ start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given - actual_results = self.indexer.run( # checks the bytes input this time + actual_results = self.indexer.run( start, end, skip_existing=False) # no already indexed data so same result as prior test # then - self.assertTrue(actual_results) + self.assertEquals(actual_results, {'status': 'uneventful'}) def test_generate_content_get_no_result(self): """No result indexed returns False""" @@ -759,8 +759,7 @@ '0000000000000000000000000000000000000001'] start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given - actual_results = self.indexer.run( - start, end, incremental=False) + actual_results = self.indexer.run(start, end, incremental=False) # then - self.assertFalse(actual_results) + self.assertEquals(actual_results, {'status': 'uneventful'})