diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,6 +6,8 @@ import subprocess import json +from typing import Dict, List + from swh.model import hashutil from .indexer import ContentIndexer, write_to_temp @@ -135,17 +137,18 @@ return ctags - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str) -> Dict: """Persist the results in storage. Args: - results ([dict]): list of content_mimetype, dict with the + results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ - self.idx_storage.content_ctags_add( + return self.idx_storage.content_ctags_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,17 +1,21 @@ -# Copyright (C) 2016-2018 The Software Heritage developers +# Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import subprocess -from typing import Optional +from typing import Any, Dict, List, Optional from swh.model import hashutil from .indexer import ContentIndexer, ContentRangeIndexer, write_to_temp -def compute_license(path, log=None): +logger = logging.getLogger(__name__) + + +def compute_license(path): """Determine license from file at path. Args: @@ -38,10 +42,9 @@ 'path': path, } except subprocess.CalledProcessError: - if log: - from os import path as __path - log.exception('Problem during license detection for sha1 %s' % - __path.basename(path)) + from os import path as __path + logger.exception('Problem during license detection for sha1 %s' % + __path.basename(path)) return { 'licenses': [], 'path': path, @@ -68,12 +71,15 @@ } CONFIG_BASE_FILENAME = 'indexer/fossology_license' # type: Optional[str] + tool: Any + idx_storage: Any def prepare(self): super().prepare() self.working_directory = self.config['workdir'] - def index(self, id, data): + def index(self, id: bytes, data: Optional[bytes] = None, + **kwargs) -> Dict[str, Any]: """Index sha1s' content and store result. Args: @@ -90,33 +96,35 @@ """ assert isinstance(id, bytes) + assert data is not None with write_to_temp( filename=hashutil.hash_to_hex(id), # use the id as pathname data=data, working_directory=self.working_directory) as content_path: - properties = compute_license(path=content_path, log=self.log) + properties = compute_license(path=content_path) properties.update({ 'id': id, 'indexer_configuration_id': self.tool['id'], }) return properties - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str) -> Dict: """Persist the results in storage. Args: - results ([dict]): list of content_license, dict with the + results: list of content_license dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ - self.idx_storage.content_fossology_license_add( + return self.idx_storage.content_fossology_license_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -10,8 +10,9 @@ import tempfile from contextlib import contextmanager -from typing import Any, Dict, Tuple, Generator, Union, List -from typing import Set +from typing import ( + Any, Dict, Iterator, List, Optional, Set, Tuple, Union +) from swh.scheduler import CONFIG as SWH_CONFIG @@ -26,8 +27,7 @@ @contextmanager def write_to_temp( - filename: str, data: bytes, working_directory: str -) -> Generator[str, None, None]: + filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: @@ -238,9 +238,8 @@ else: return [] - def index( - self, id: bytes, data: bytes - ) -> Dict[str, Any]: + def index(self, id: bytes, data: Optional[bytes] = None, + **kwargs) -> Dict[str, Any]: """Index computation for the id and associated raw data. Args: @@ -255,7 +254,7 @@ """ raise NotImplementedError() - def filter(self, ids: List[bytes]) -> Generator[bytes, None, None]: + def filter(self, ids: List[bytes]) -> Iterator[bytes]: """Filter missing ids for that particular indexer. Args: @@ -268,7 +267,7 @@ yield from ids @abc.abstractmethod - def persist_index_computations(self, results, policy_update): + def persist_index_computations(self, results, policy_update) -> Dict: """Persist the computation resulting from the index. Args: @@ -279,27 +278,10 @@ respectively update duplicates or ignore them Returns: - None + a summary dict of what has been inserted in the storage """ - pass - - @abc.abstractmethod - def run(self, ids, policy_update, **kwargs): - """Given a list of ids: - - - retrieves the data from the storage - - executes the indexing computations - - stores the results (according to policy_update) - - Args: - ids ([bytes]): id's identifier list - policy_update (str): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them - **kwargs: passed to the `index` method - - """ - pass + return {} class ContentIndexer(BaseIndexer): @@ -313,8 +295,8 @@ methods mentioned in the :class:`BaseIndexer` class. """ - - def run(self, ids, policy_update, **kwargs): + def run(self, ids: Union[List[bytes], bytes, str], policy_update: str, + **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage @@ -328,12 +310,17 @@ them **kwargs: passed to the `index` method + Returns: + A summary Dict of the task's status + """ - ids = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ - for id_ in ids] + status = 'uneventful' + sha1s = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ + for id_ in ids] results = [] + summary: Dict = {} try: - for sha1 in ids: + for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: @@ -343,14 +330,18 @@ res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) - - self.persist_index_computations(results, policy_update) + status = 'eventful' + summary = self.persist_index_computations(results, policy_update) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception( 'Problem when reading contents metadata.') + status = 'failed' + finally: + summary['status'] = status + return summary class ContentRangeIndexer(BaseIndexer): @@ -383,7 +374,7 @@ def _list_contents_to_index( self, start: bytes, end: bytes, indexed: Set[bytes] - ) -> Generator[bytes, None, None]: + ) -> Iterator[bytes]: """Compute from storage the new contents to index in the range [start, end]. The already indexed contents are skipped. @@ -411,7 +402,7 @@ def _index_contents( self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any - ) -> Generator[Dict, None, None]: + ) -> Iterator[Dict]: """Index the contents from within range [start, end] Args: @@ -430,7 +421,7 @@ self.log.warning('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue - res = self.index(sha1, raw_content, **kwargs) # type: ignore + res = self.index(sha1, raw_content, **kwargs) if res: if not isinstance(res['id'], bytes): raise TypeError( @@ -439,8 +430,7 @@ yield res def _index_with_skipping_already_done( - self, start: bytes, end: bytes - ) -> Generator[Dict, None, None]: + self, start: bytes, end: bytes) -> Iterator[Dict]: """Index not already indexed contents in range [start, end]. Args: @@ -460,47 +450,55 @@ start, _end, contents) start = indexed_page['next'] - def run(self, start, end, skip_existing=True, **kwargs): + def run(self, start: Union[bytes, str], end: Union[bytes, str], + skip_existing: bool = True, **kwargs) -> Dict: """Given a range of content ids, compute the indexing computations on the contents within. Either the indexer is incremental (filter out existing computed data) or not (compute everything from scratch). Args: - start (Union[bytes, str]): Starting range identifier - end (Union[bytes, str]): Ending range identifier - skip_existing (bool): Skip existing indexed data + start: Starting range identifier + end: Ending range identifier + skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: - bool: True if data was indexed, False otherwise. + A dict with the task's status """ - with_indexed_data = False + status = 'uneventful' + summary: Dict = {} try: - if isinstance(start, str): - start = hashutil.hash_to_bytes(start) - if isinstance(end, str): - end = hashutil.hash_to_bytes(end) + range_start = hashutil.hash_to_bytes(start) \ + if isinstance(start, str) else start + range_end = hashutil.hash_to_bytes(end) \ + if isinstance(end, str) else end if skip_existing: - gen = self._index_with_skipping_already_done(start, end) + gen = self._index_with_skipping_already_done( + range_start, range_end) else: - gen = self._index_contents(start, end, indexed=[]) - - for results in utils.grouper(gen, - n=self.config['write_batch_size']): - self.persist_index_computations( - results, policy_update='update-dups') - with_indexed_data = True + gen = self._index_contents( + range_start, range_end, indexed=set([])) + + for contents in utils.grouper( + gen, n=self.config['write_batch_size']): + res = self.persist_index_computations( + contents, policy_update='update-dups') + summary['content_mimetype:add'] += res.get( + 'content_mimetype:add') + status = 'eventful' except Exception: if not self.catch_exceptions: raise self.log.exception( 'Problem when computing metadata.') + status = 'failed' finally: - return with_indexed_data + summary['status'] = status + return summary class OriginIndexer(BaseIndexer): @@ -513,8 +511,8 @@ class. """ - def run(self, origin_urls, policy_update='update-dups', - next_step=None, **kwargs): + def run(self, origin_urls: List[str], + policy_update: str = 'update-dups', **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage @@ -522,17 +520,16 @@ - store the results (according to policy_update) Args: - origin_urls ([str]): list of origin urls. - policy_update (str): either 'update-dups' or 'ignore-dups' to + origin_urls: list of origin urls. + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates (default) or ignore them - parse_ids (bool): Do we need to parse id or not (default) **kwargs: passed to the `index` method """ results = self.index_list(origin_urls, **kwargs) - - self.persist_index_computations(results, policy_update) + summary = self.persist_index_computations(results, policy_update) self.results = results + return summary def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]: results = [] @@ -560,7 +557,7 @@ class. """ - def run(self, ids, policy_update): + def run(self, ids: Union[str, bytes], policy_update: str) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage @@ -568,15 +565,15 @@ - store the results (according to policy_update) Args: - ids ([bytes or str]): sha1_git's identifier list - policy_update (str): either 'update-dups' or 'ignore-dups' to + ids: sha1_git's identifier list + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] - ids = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ - for id_ in ids] - revs = self.storage.revision_get(ids) + revs = self.storage.revision_get( + hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ + for id_ in ids) for rev in revs: if not rev: @@ -592,5 +589,6 @@ raise self.log.exception( 'Problem when processing revision') - self.persist_index_computations(results, policy_update) + summary = self.persist_index_computations(results, policy_update) self.results = results + return summary diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -5,7 +5,7 @@ from copy import deepcopy -from typing import Any, List, Dict, Tuple, Callable, Generator +from typing import Any, Callable, Dict, Iterator, List, Tuple from swh.core.utils import grouper @@ -26,7 +26,7 @@ def call_with_batches( f: Callable[[List[Dict[str, Any]]], Dict['str', Any]], args: List[Dict[str, str]], batch_size: int -) -> Generator[str, None, None]: +) -> Iterator[str]: """Calls a function with batches of args, and concatenates the results. """ groups = grouper(args, batch_size) @@ -89,7 +89,7 @@ def persist_index_computations( self, results: List[Dict], policy_update: str - ) -> None: + ) -> Dict: """Persist the results in storage. Args: @@ -101,7 +101,7 @@ respectively update duplicates or ignore them """ - self.idx_storage.content_metadata_add( + return self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) @@ -188,7 +188,7 @@ def persist_index_computations( self, results: List[Dict], policy_update: str - ) -> None: + ) -> Dict: """Persist the results in storage. Args: @@ -203,7 +203,7 @@ """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata - self.idx_storage.revision_intrinsic_metadata_add( + return self.idx_storage.revision_intrinsic_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_intrinsic_metadata( @@ -327,7 +327,7 @@ def persist_index_computations( self, results: List[Dict], policy_update: str - ) -> None: + ) -> Dict: conflict_update = (policy_update == 'update-dups') # Deduplicate revisions @@ -335,6 +335,7 @@ orig_metadata: List[Any] = [] revs_to_delete: List[Any] = [] origs_to_delete: List[Any] = [] + summary: Dict = {} for (orig_item, rev_item) in results: assert rev_item['metadata'] == orig_item['metadata'] if not rev_item['metadata'] or \ @@ -352,17 +353,25 @@ orig_metadata.append(orig_item) if rev_metadata: - self.idx_storage.revision_intrinsic_metadata_add( + summary_rev = self.idx_storage.revision_intrinsic_metadata_add( rev_metadata, conflict_update=conflict_update) + summary.update(summary_rev) if orig_metadata: - self.idx_storage.origin_intrinsic_metadata_add( + summary_ori = self.idx_storage.origin_intrinsic_metadata_add( orig_metadata, conflict_update=conflict_update) + summary.update(summary_ori) # revs_to_delete should always be empty unless we changed a mapping # to detect less files or less content. # However, origs_to_delete may be empty whenever an upstream deletes # a metadata file. if origs_to_delete: - self.idx_storage.origin_intrinsic_metadata_delete(origs_to_delete) + summary_ori = self.idx_storage.origin_intrinsic_metadata_delete( + origs_to_delete) + summary.update(summary_ori) if revs_to_delete: - self.idx_storage.revision_intrinsic_metadata_delete(revs_to_delete) + summary_rev = self.idx_storage.revision_intrinsic_metadata_delete( + revs_to_delete) + summary.update(summary_ori) + + return summary diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -55,7 +55,8 @@ CONFIG_BASE_FILENAME = 'indexer/mimetype' # type: Optional[str] - def index(self, id: bytes, data: bytes) -> Dict[str, Any]: + def index(self, id: bytes, data: Optional[bytes] = None, + **kwargs) -> Dict[str, Any]: """Index sha1s' content and store result. Args: @@ -70,6 +71,7 @@ - encoding: encoding in bytes """ + assert data is not None properties = compute_mimetype_encoding(data) properties.update({ 'id': id, @@ -78,8 +80,8 @@ return properties def persist_index_computations( - self, results: List[Dict], policy_update: List[str] - ) -> None: + self, results: List[Dict], policy_update: str + ) -> Dict: """Persist the results in storage. Args: @@ -90,7 +92,7 @@ respectively update duplicates or ignore them """ - self.idx_storage.content_mimetype_add( + return self.idx_storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -24,10 +24,10 @@ def persist_index_computations( self, results: Any, policy_update: str - ) -> None: + ) -> Dict: """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer.""" - pass + return {} # Dispatch diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -145,19 +145,25 @@ content.update(content_hashes) yield content, checksums_to_compute - def run(self, contents: List[Dict[str, Any]]) -> None: + def run(self, contents: List[Dict[str, Any]]) -> Dict: """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata - Args: - contents: contents as dictionary with necessary keys. - key present in such dictionary should be the ones defined in - the 'primary_key' option. + Args: + contents: contents as dictionary with necessary keys. + key present in such dictionary should be the ones defined in + the 'primary_key' option. + + Returns: + A summary dict with key 'status', task' status and 'count' the + number of updated contents. """ + status = 'uneventful' + count = 0 for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update): @@ -172,6 +178,13 @@ try: self.storage.content_update(contents, keys=keys) + count += len(contents) + status = 'eventful' except Exception: self.log.exception('Problem during update.') continue + + return { + 'status': status, + 'count': count, + } diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -13,6 +13,7 @@ from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError +from swh.storage.metrics import send_metric, timed, process_metrics from . import converters from .db import Db @@ -166,10 +167,12 @@ indexer_configuration_id, limit=limit, db=db, cur=cur) + @timed + @process_metrics @db_transaction() def content_mimetype_add( self, mimetypes: List[Dict], conflict_update: bool = False, - db=None, cur=None) -> Dict: + db=None, cur=None) -> Dict[str, int]: """Add mimetypes to the storage (if conflict_update is True, this will override existing data if any). @@ -184,6 +187,8 @@ ['id', 'mimetype', 'encoding', 'indexer_configuration_id'], cur) count = db.content_mimetype_add_from_temp(conflict_update, cur) + send_metric('content_mimetype:add', + count=count, method_name='content_mimetype_add') return { 'content_mimetype:add': count } @@ -205,10 +210,12 @@ yield converters.db_to_language( dict(zip(db.content_language_cols, c))) + @timed + @process_metrics @db_transaction() def content_language_add( - self, languages: List[Dict], - conflict_update: bool = False, db=None, cur=None) -> Dict: + self, languages: List[Dict], conflict_update: bool = False, + db=None, cur=None) -> Dict[str, int]: check_id_duplicates(languages) languages.sort(key=lambda m: m['id']) db.mktemp_content_language(cur) @@ -223,6 +230,8 @@ ['id', 'lang', 'indexer_configuration_id'], cur) count = db.content_language_add_from_temp(conflict_update, cur) + send_metric('content_language:add', + count=count, method_name='content_language_add') return { 'content_language:add': count } @@ -237,10 +246,12 @@ for c in db.content_ctags_get_from_list(ids, cur): yield converters.db_to_ctags(dict(zip(db.content_ctags_cols, c))) + @timed + @process_metrics @db_transaction() def content_ctags_add( self, ctags: List[Dict], conflict_update: bool = False, - db=None, cur=None) -> Dict: + db=None, cur=None) -> Dict[str, int]: check_id_duplicates(ctags) ctags.sort(key=lambda m: m['id']) @@ -259,6 +270,8 @@ cur=cur) count = db.content_ctags_add_from_temp(conflict_update, cur) + send_metric('content_ctags:add', + count=count, method_name='content_ctags_add') return { 'content_ctags:add': count } @@ -282,10 +295,12 @@ for id_, facts in d.items(): yield {id_: facts} + @timed + @process_metrics @db_transaction() def content_fossology_license_add( self, licenses: List[Dict], conflict_update: bool = False, - db=None, cur=None) -> Dict: + db=None, cur=None) -> Dict[str, int]: check_id_duplicates(licenses) licenses.sort(key=lambda m: m['id']) db.mktemp_content_fossology_license(cur) @@ -301,6 +316,8 @@ cur=cur) count = db.content_fossology_license_add_from_temp( conflict_update, cur) + send_metric('content_fossology_license:add', + count=count, method_name='content_fossology_license_add') return { 'content_fossology_license:add': count } @@ -324,10 +341,12 @@ yield converters.db_to_metadata( dict(zip(db.content_metadata_cols, c))) + @timed + @process_metrics @db_transaction() def content_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, - db=None, cur=None) -> Dict: + db=None, cur=None) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -337,6 +356,8 @@ ['id', 'metadata', 'indexer_configuration_id'], cur) count = db.content_metadata_add_from_temp(conflict_update, cur) + send_metric('content_metadata:add', + count=count, method_name='content_metadata_add') return { 'content_metadata:add': count, } @@ -353,10 +374,12 @@ yield converters.db_to_metadata( dict(zip(db.revision_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() def revision_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, - db=None, cur=None) -> Dict: + db=None, cur=None) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -368,10 +391,14 @@ cur) count = db.revision_intrinsic_metadata_add_from_temp( conflict_update, cur) + send_metric('revision_intrinsic_metadata:add', + count=count, method_name='revision_intrinsic_metadata_add') return { 'revision_intrinsic_metadata:add': count, } + @timed + @process_metrics @db_transaction() def revision_intrinsic_metadata_delete( self, entries: List[Dict], db=None, cur=None) -> Dict: @@ -386,10 +413,12 @@ yield converters.db_to_metadata( dict(zip(db.origin_intrinsic_metadata_cols, c))) + @timed + @process_metrics @db_transaction() def origin_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, - db=None, cur=None) -> Dict: + db=None, cur=None) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m['id']) @@ -402,10 +431,14 @@ cur) count = db.origin_intrinsic_metadata_add_from_temp( conflict_update, cur) + send_metric('content_origin_intrinsic:add', + count=count, method_name='content_origin_intrinsic_add') return { 'origin_intrinsic_metadata:add': count, } + @timed + @process_metrics @db_transaction() def origin_intrinsic_metadata_delete( self, entries: List[Dict], db=None, cur=None) -> Dict: diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -230,7 +230,7 @@ def content_mimetype_add( self, mimetypes: List[Dict], - conflict_update: bool = False) -> Dict: + conflict_update: bool = False) -> Dict[str, int]: check_id_types(mimetypes) added = self._mimetypes.add(mimetypes, conflict_update) return {'content_mimetype:add': added} @@ -246,7 +246,7 @@ def content_language_add( self, languages: List[Dict], - conflict_update: bool = False) -> Dict: + conflict_update: bool = False) -> Dict[str, int]: check_id_types(languages) added = self._languages.add(languages, conflict_update) return {'content_language:add': added} @@ -264,7 +264,8 @@ } def content_ctags_add( - self, ctags: List[Dict], conflict_update: bool = False) -> Dict: + self, ctags: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: check_id_types(ctags) added = self._content_ctags.add_merge(ctags, conflict_update, 'ctags') return {'content_ctags:add': added} @@ -300,7 +301,8 @@ yield {id_: facts} def content_fossology_license_add( - self, licenses: List[Dict], conflict_update: bool = False) -> Dict: + self, licenses: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: check_id_types(licenses) added = self._licenses.add_merge(licenses, conflict_update, 'licenses') return {'fossology_license_add:add': added} @@ -317,7 +319,8 @@ yield from self._content_metadata.get(ids) def content_metadata_add( - self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + self, metadata: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: check_id_types(metadata) added = self._content_metadata.add(metadata, conflict_update) return {'content_metadata:add': added} @@ -329,7 +332,8 @@ yield from self._revision_intrinsic_metadata.get(ids) def revision_intrinsic_metadata_add( - self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + self, metadata: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: check_id_types(metadata) added = self._revision_intrinsic_metadata.add( metadata, conflict_update) @@ -343,7 +347,8 @@ yield from self._origin_intrinsic_metadata.get(ids) def origin_intrinsic_metadata_add( - self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + self, metadata: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: added = self._origin_intrinsic_metadata.add(metadata, conflict_update) return {'origin_intrinsic_metadata:add': added} diff --git a/swh/indexer/storage/interface.py b/swh/indexer/storage/interface.py --- a/swh/indexer/storage/interface.py +++ b/swh/indexer/storage/interface.py @@ -91,7 +91,7 @@ @remote_api_endpoint('content_mimetype/add') def content_mimetype_add(self, mimetypes: List[Dict], - conflict_update: bool = False) -> Dict: + conflict_update: bool = False) -> Dict[str, int]: """Add mimetypes not present in storage. Args: @@ -168,7 +168,7 @@ @remote_api_endpoint('content_language/add') def content_language_add( self, languages: List[Dict], - conflict_update: bool = False) -> Dict: + conflict_update: bool = False) -> Dict[str, int]: """Add languages not present in storage. Args: @@ -227,7 +227,7 @@ @remote_api_endpoint('content/ctags/add') def content_ctags_add(self, ctags: List[Dict], - conflict_update: bool = False) -> Dict: + conflict_update: bool = False) -> Dict[str, int]: """Add ctags not present in storage Args: @@ -278,7 +278,8 @@ @remote_api_endpoint('content/fossology_license/add') def content_fossology_license_add( - self, licenses: List[Dict], conflict_update: bool = False) -> Dict: + self, licenses: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: """Add licenses not present in storage. Args: @@ -359,7 +360,8 @@ @remote_api_endpoint('content_metadata/add') def content_metadata_add( - self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + self, metadata: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: """Add metadata not present in storage. Args: @@ -415,7 +417,8 @@ @remote_api_endpoint('revision_intrinsic_metadata/add') def revision_intrinsic_metadata_add( - self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + self, metadata: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: """Add metadata not present in storage. Args: @@ -475,7 +478,8 @@ @remote_api_endpoint('origin_intrinsic_metadata/add') def origin_intrinsic_metadata_add( - self, metadata: List[Dict], conflict_update: bool = False) -> Dict: + self, metadata: List[Dict], + conflict_update: bool = False) -> Dict[str, int]: """Add origin metadata not present in storage. Args: diff --git a/swh/indexer/storage/metrics.py b/swh/indexer/storage/metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/metrics.py @@ -0,0 +1,79 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = 'swh_indexer_storage_operations_total' +OPERATIONS_UNIT_METRIC = "swh_indexer_storage_operations_{unit}_total" +DURATION_METRIC = "swh_indexer_storage_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric, count, method_name): + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric (str): Metric's name (e.g content:add, content:add:bytes) + count (int): Associated value for the metric + method_name (str): Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) + else: + logging.warning('Skipping unknown metric {%s: %s}' % ( + metric, count)) + return False + + statsd.increment( + metric_name, count, tags={ + 'endpoint': method_name, + 'object_type': object_type, + 'operation': operation, + }) + return True + + +def process_metrics(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for metric, count in r.items(): + send_metric(metric=metric, count=count, method_name=f.__name__) + + return r + + return d diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 The Software Heritage developers +# Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -17,41 +17,34 @@ @app.task(name=__name__ + '.OriginMetadata') def origin_metadata(*args, **kwargs): - results = OriginMetadataIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) + return OriginMetadataIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.Ctags') def ctags(*args, **kwargs): - results = CtagsIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) + return CtagsIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.ContentFossologyLicense') def fossology_license(*args, **kwargs): - results = FossologyLicenseIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) + return FossologyLicenseIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.RecomputeChecksums') def recompute_checksums(*args, **kwargs): - results = RecomputeChecksums().run(*args, **kwargs) - return getattr(results, 'results', results) + return RecomputeChecksums().run(*args, **kwargs) @app.task(name=__name__ + '.ContentMimetype') def mimetype(*args, **kwargs): - results = MimetypeIndexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} + return MimetypeIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.ContentRangeMimetype') def range_mimetype(*args, **kwargs): - results = MimetypeRangeIndexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} + return MimetypeRangeIndexer().run(*args, **kwargs) @app.task(name=__name__ + '.ContentRangeFossologyLicense') def range_license(*args, **kwargs): - results = FossologyLicenseRangeIndexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} + return FossologyLicenseRangeIndexer().run(*args, **kwargs) diff --git a/swh/indexer/tests/storage/test_metrics.py b/swh/indexer/tests/storage/test_metrics.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/storage/test_metrics.py @@ -0,0 +1,53 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.indexer.storage.metrics import ( + send_metric, OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC +) + + +def test_send_metric_unknown_unit(): + r = send_metric('content', count=10, method_name='content_add') + assert r is False + r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') + assert r is False + + +def test_send_metric_no_value(): + r = send_metric('content_mimetype:add', count=0, + method_name='content_mimetype_add') + assert r is False + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_no_unit(mock_statsd): + r = send_metric('content_mimetype:add', count=10, + method_name='content_mimetype_add') + + mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ + 'endpoint': 'content_mimetype_add', + 'object_type': 'content_mimetype', + 'operation': 'add', + }) + + assert r + + +@patch('swh.indexer.storage.metrics.statsd.increment') +def test_send_metric_unit(mock_statsd): + unit_ = 'bytes' + r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') + + expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) + mock_statsd.assert_called_with( + expected_metric, 100, tags={ + 'endpoint': 'c_add', + 'object_type': 'c', + 'operation': 'add', + }) + + assert r diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py --- a/swh/indexer/tests/test_fossology_license.py +++ b/swh/indexer/tests/test_fossology_license.py @@ -36,7 +36,7 @@ ['GPL', 'AGPL'])]: mock_subprocess.check_output.return_value = intermediary_result - actual_result = compute_license(path, log=None) + actual_result = compute_license(path) self.assertEqual(actual_result, { 'licenses': output, @@ -44,7 +44,7 @@ }) -def mock_compute_license(path, log=None): +def mock_compute_license(path): """path is the content identifier """ diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -746,12 +746,12 @@ start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given - actual_results = self.indexer.run( # checks the bytes input this time + actual_results = self.indexer.run( start, end, skip_existing=False) # no already indexed data so same result as prior test # then - self.assertTrue(actual_results) + self.assertEquals(actual_results, {'status': 'uneventful'}) def test_generate_content_get_no_result(self): """No result indexed returns False""" @@ -759,8 +759,7 @@ '0000000000000000000000000000000000000001'] start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given - actual_results = self.indexer.run( - start, end, incremental=False) + actual_results = self.indexer.run(start, end, incremental=False) # then - self.assertFalse(actual_results) + self.assertEquals(actual_results, {'status': 'uneventful'})