diff --git a/PKG-INFO b/PKG-INFO index e8fd3b6..4d86fdc 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,114 +1,114 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.0.54 +Version: 0.0.55 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata ## Context SWH has currently stored around 5B contents. The table `content` holds their checksums. Those contents are physically stored in an object storage (using disks) and replicated in another. Those object storages are not destined for reading yet. We are in the process to copy those contents over to azure's blob storages. As such, we will use that opportunity to trigger the computations on these contents once those have been copied over. ## Workers There are two types of workers: - orchestrators (orchestrator, orchestrator-text) - indexer (mimetype, language, ctags, fossology-license) ### Orchestrator The orchestrator is in charge of dispatching a batch of sha1 hashes to different indexers. Orchestration procedure: - receive batch of sha1s - split those batches into groups (according to setup) - broadcast those group to indexers There are two types of orchestrators: - orchestrator (swh_indexer_orchestrator_content_all): Receives and broadcast sha1 ids (of contents) to indexers (currently only the mimetype indexer) - orchestrator-text (swh_indexer_orchestrator_content_text): Receives batch of sha1 ids (of textual contents) and broadcast those to indexers (currently language, ctags, and fossology-license indexers). ### Indexers An indexer is in charge of the content retrieval and indexation of the extracted information in the swh-indexer db. There are two types of indexers: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage - (and possibly do some broadcast itself) Current content indexers: - mimetype (queue swh_indexer_content_mimetype): compute the mimetype, filter out the textual contents and broadcast the list to the orchestrator-text - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): try and compute tags information - fossology-license (queue swh_indexer_fossology_license): try and compute the license - metadata : translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/debian/control b/debian/control index dec0b2f..793f9b6 100644 --- a/debian/control +++ b/debian/control @@ -1,47 +1,47 @@ Source: swh-indexer Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-chardet (>= 2.3.0~), python3-click, python3-pytest, python3-pygments, python3-magic, python3-setuptools, python3-swh.core (>= 0.0.44~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.13~), - python3-swh.scheduler (>= 0.0.33~), + python3-swh.scheduler (>= 0.0.35~), python3-swh.storage (>= 0.0.102~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/78/ Package: python3-swh.indexer.storage Architecture: all Depends: python3-swh.core (>= 0.0.44~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.13~), - python3-swh.scheduler (>= 0.0.33~), + python3-swh.scheduler (>= 0.0.35~), python3-swh.storage (>= 0.0.102~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Content Indexer Storage Package: python3-swh.indexer Architecture: all Depends: python3-swh.scheduler (>= 0.0.14~), python3-swh.core (>= 0.0.44~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.13~), - python3-swh.scheduler (>= 0.0.33~), + python3-swh.scheduler (>= 0.0.35~), python3-swh.storage (>= 0.0.102~), python3-swh.indexer.storage (= ${binary:Version}), universal-ctags (>= 0.8~), fossology-nomossa (>= 3.1~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Content Indexer diff --git a/requirements-swh.txt b/requirements-swh.txt index 39e716d..376de6d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.44 swh.model >= 0.0.15 swh.objstorage >= 0.0.13 -swh.scheduler >= 0.0.33 +swh.scheduler >= 0.0.35 swh.storage >= 0.0.102 diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO index e8fd3b6..4d86fdc 100644 --- a/swh.indexer.egg-info/PKG-INFO +++ b/swh.indexer.egg-info/PKG-INFO @@ -1,114 +1,114 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.0.54 +Version: 0.0.55 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata ## Context SWH has currently stored around 5B contents. The table `content` holds their checksums. Those contents are physically stored in an object storage (using disks) and replicated in another. Those object storages are not destined for reading yet. We are in the process to copy those contents over to azure's blob storages. As such, we will use that opportunity to trigger the computations on these contents once those have been copied over. ## Workers There are two types of workers: - orchestrators (orchestrator, orchestrator-text) - indexer (mimetype, language, ctags, fossology-license) ### Orchestrator The orchestrator is in charge of dispatching a batch of sha1 hashes to different indexers. Orchestration procedure: - receive batch of sha1s - split those batches into groups (according to setup) - broadcast those group to indexers There are two types of orchestrators: - orchestrator (swh_indexer_orchestrator_content_all): Receives and broadcast sha1 ids (of contents) to indexers (currently only the mimetype indexer) - orchestrator-text (swh_indexer_orchestrator_content_text): Receives batch of sha1 ids (of textual contents) and broadcast those to indexers (currently language, ctags, and fossology-license indexers). ### Indexers An indexer is in charge of the content retrieval and indexation of the extracted information in the swh-indexer db. There are two types of indexers: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage - (and possibly do some broadcast itself) Current content indexers: - mimetype (queue swh_indexer_content_mimetype): compute the mimetype, filter out the textual contents and broadcast the list to the orchestrator-text - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): try and compute tags information - fossology-license (queue swh_indexer_fossology_license): try and compute the license - metadata : translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/requires.txt b/swh.indexer.egg-info/requires.txt index cbaafb5..6fd8fdd 100644 --- a/swh.indexer.egg-info/requires.txt +++ b/swh.indexer.egg-info/requires.txt @@ -1,13 +1,13 @@ chardet click file_magic pygments swh.core>=0.0.44 swh.model>=0.0.15 swh.objstorage>=0.0.13 -swh.scheduler>=0.0.33 +swh.scheduler>=0.0.35 swh.storage>=0.0.102 vcversioner [testing] pytest diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py index 6091175..8020958 100644 --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -1,32 +1,31 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information INDEXER_CLASSES = { 'mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer', 'language': 'swh.indexer.language.ContentLanguageIndexer', 'ctags': 'swh.indexer.ctags.CtagsIndexer', 'fossology_license': 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer', } TASK_NAMES = { 'orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents', 'orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents', 'mimetype': 'swh.indexer.tasks.ContentMimetype', 'language': 'swh.indexer.tasks.ContentLanguage', 'ctags': 'swh.indexer.tasks.Ctags', 'fossology_license': 'swh.indexer.tasks.ContentFossologyLicense', 'rehash': 'swh.indexer.tasks.RecomputeChecksums', 'revision_metadata': 'swh.indexer.tasks.RevisionMetadata', - 'origin_intrinsic_metadata': - 'swh.indexer.tasks.OriginMetadata', + 'origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata', } __all__ = [ 'INDEXER_CLASSES', 'TASK_NAMES', ] diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 3f1fb8c..9015e4a 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,492 +1,507 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile +import datetime +from copy import deepcopy +from swh.scheduler import get_scheduler from swh.storage import get_storage from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil -from swh.scheduler.utils import get_task from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY class DiskIndexer: """Mixin intended to be used with other SomethingIndexer classes. Indexers inheriting from this class are a category of indexers which needs the disk for their computations. Note: This expects `self.working_directory` variable defined at runtime. """ def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) -class BaseIndexer(SWHConfig, - metaclass=abc.ABCMeta): +class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :func:`run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :func:`filter`: filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). :func:`index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :func:`persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :func:`prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :func:`check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :func:`register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { INDEXER_CFG_KEY: ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5007/' } }), - - # queue to reschedule if problem (none for no rescheduling, - # the default) - 'rescheduling_task': ('str', None), 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if self.config['storage']: self.storage = get_storage(**self.config['storage']) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) - rescheduling_task = self.config['rescheduling_task'] - if rescheduling_task: - self.rescheduling_task = get_task(rescheduling_task) - else: - self.rescheduling_task = None _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = list(self.register_tools(self.config['tools'])) def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.tools) def _prepare_tool(self, tool): """Prepare the tool dict to be compliant with the storage api. """ return {'tool_%s' % key: value for key, value in tool.items()} def register_tools(self, tools): """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools (dict/[dict]): Either a dict or a list of dict. Returns: List of dict with additional id key. Raises: ValueError if not a list nor a dict. """ tools = self.config['tools'] if isinstance(tools, list): tools = map(self._prepare_tool, tools) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError('Configuration tool(s) must be a dict or list!') return self.idx_storage.indexer_configuration_add(tools) @abc.abstractmethod def filter(self, ids): """Filter missing ids for that particular indexer. Args: ids ([bytes]): list of ids Yields: iterator of missing ids """ pass @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. Args: id (bytes): identifier data (bytes): id's data from storage or objstorage depending on object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass - def next_step(self, results): + def next_step(self, results, task): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index function. + task (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. Returns: None """ - pass + if task: + if getattr(self, 'scheduler', None): + scheduler = self.scheduler + else: + scheduler = get_scheduler(**self.config['scheduler']) + task = deepcopy(task) + result_name = task.pop('result_name') + task['next_run'] = datetime.datetime.now() + task['arguments']['kwargs'][result_name] = self.results + scheduler.create_tasks([task]) @abc.abstractmethod - def run(self, ids, policy_update, **kwargs): + def run(self, ids, policy_update, + next_step=None, **kwargs): """Given a list of ids: - retrieves the data from the storage - executes the indexing computations - stores the results (according to policy_update) Args: ids ([bytes]): id's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ pass class ContentIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Content indexing using the run method Note: the :class:`ContentIndexer` is not an instantiable object. To use it in another context, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ - def run(self, ids, policy_update, **kwargs): + def run(self, ids, policy_update, + next_step=None, **kwargs): """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([bytes]): sha1's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them + policy_update (str): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore + them + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ results = [] try: for sha1 in ids: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: - self.log.warn('Content %s not found in objstorage' % - hashutil.hash_to_hex(sha1)) + self.log.warning('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) except Exception: self.log.exception( 'Problem when reading contents metadata.') - if self.rescheduling_task: - self.log.warn('Rescheduling batch') - self.rescheduling_task.delay(ids, policy_update) class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ - def run(self, ids, policy_update, parse_ids=False, **kwargs): + def run(self, ids, policy_update, + parse_ids=False, next_step=None, **kwargs): """Given a list of origin ids: - retrieve origins from storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or (type, url) tuples. - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them - parse_ids ([bool]: If `True`, will try to convert `ids` + parse_ids (bool: If `True`, will try to convert `ids` from a human input to the valid type. + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ if parse_ids: ids = [ o.split('+', 1) if ':' in o else int(o) # type+url or id for o in ids] results = [] for id_ in ids: if isinstance(id_, (tuple, list)): if len(id_) != 2: raise TypeError('Expected a (type, url) tuple.') (type_, url) = id_ params = {'type': type_, 'url': url} elif isinstance(id_, int): params = {'id': id_} else: raise TypeError('Invalid value in "ids": %r' % id_) origin = self.storage.origin_get(params) if not origin: - self.log.warn('Origins %s not found in storage' % - list(ids)) + self.log.warning('Origins %s not found in storage' % + list(ids)) continue try: res = self.index(origin, **kwargs) if origin: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) class RevisionIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ - def run(self, ids, policy_update): + def run(self, ids, policy_update, next_step=None): """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results (according to policy_update) Args: - ids ([bytes]): sha1_git's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them + ids ([bytes or str]): sha1_git's identifier list + policy_update (str): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore + them """ results = [] + ids = [id_.encode() if isinstance(id_, str) else id_ + for id_ in ids] revs = self.storage.revision_get(ids) for rev in revs: if not rev: - self.log.warn('Revisions %s not found in storage' % - list(map(hashutil.hash_to_hex, ids))) + self.log.warning('Revisions %s not found in storage' % + list(map(hashutil.hash_to_hex, ids))) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) diff --git a/swh/indexer/language.py b/swh/indexer/language.py index 044d80a..5ac61ec 100644 --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -1,208 +1,209 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from pygments.lexers import guess_lexer from pygments.util import ClassNotFound from chardet.universaldetector import UniversalDetector from .indexer import ContentIndexer def _cleanup_classname(classname): """Determine the language from the pygments' lexer names. """ return classname.lower().replace(' ', '-') def _read_raw(raw_content, size=2048): """Read raw content in chunk. """ bs = io.BytesIO(raw_content) while True: chunk = bs.read(size) if not chunk: break yield chunk def _detect_encoding(raw_content): """Given a raw content, try and detect its encoding. """ detector = UniversalDetector() for chunk in _read_raw(raw_content): detector.feed(chunk) if detector.done: break detector.close() return detector.result['encoding'] def compute_language_from_chunk(encoding, length, raw_content, max_size, log=None): """Determine the raw content's language. Args: encoding (str): Encoding to use to decode the content length (int): raw_content's length raw_content (bytes): raw content to work with max_size (int): max size to split the raw content at Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: if max_size <= length: raw_content = raw_content[0:max_size] content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except UnicodeDecodeError: raise except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } def compute_language(raw_content, encoding=None, log=None): """Determine the raw content's language. Args: raw_content (bytes): raw content to work with Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: encoding = _detect_encoding(raw_content) content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } class ContentLanguageIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ CONFIG_BASE_FILENAME = 'indexer/language' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', 'configuration': { 'type': 'library', 'debian-package': 'python3-pygments', 'max_content_size': 10240, }, }), } def prepare(self): super().prepare() c = self.config self.max_content_size = c['tools']['configuration']['max_content_size'] self.tool = self.tools[0] def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_language_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'] } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'lang': None, } encoding = _detect_encoding(data) if not encoding: return result _len = len(data) for i in range(0, 9): max_size = self.max_content_size + i try: result = compute_language_from_chunk( encoding, _len, data, max_size, log=self.log) except UnicodeDecodeError: - self.log.warn('Decoding failed on wrong byte chunk at [0-%s]' - ', trying again at next ending byte.' % max_size) + self.log.warning( + 'Decoding failed on wrong byte chunk at [0-%s]' + ', trying again at next ending byte.' % max_size) continue # we found something, so we return it result.update({ 'id': id, 'indexer_configuration_id': self.tool['id'], }) break return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_language_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 0e5a5a4..933716b 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,337 +1,334 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ CONFIG_BASE_FILENAME = 'indexer/metadata' def __init__(self, tool, config): # twisted way to use the exact same config of RevisionMetadataIndexer # object that uses internally ContentMetadataIndexer self.config = config self.config['tools'] = tool super().__init__() def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the translated_metadata keys will be returned as None """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: mapping_name = self.tool['tool_configuration']['context'] result['translated_metadata'] = MAPPINGS[mapping_name] \ .translate(data) # a twisted way to keep result with indexer object for get_results self.results.append(result) except Exception: self.log.exception( "Problem during tool retrieval of metadata translation") return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def get_results(self): """can be called only if run method was called before Returns: list: list of content_metadata entries calculated by current indexer """ return self.results class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ CONFIG_BASE_FILENAME = 'indexer/metadata' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': ['NpmMapping', 'CodemetaMapping'] }, }), } ContentMetadataIndexer = ContentMetadataIndexer def prepare(self): super().prepare() self.tool = self.tools[0] def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tool['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev (bytes): revision artifact from storage Returns: dict: dictionary representing a revision_metadata, with keys: - - id (bytes): rev's identifier (sha1_git) + - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata (bytes): dict of retrieved metadata """ try: result = { - 'id': rev['id'], + 'id': rev['id'].decode(), 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = [entry for entry in dir_ls if entry['type'] == 'file'] detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files) - except Exception: + except Exception as e: self.log.exception( - 'Problem when indexing rev') + 'Problem when indexing rev: %r', e) return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.idx_storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files): """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files (dict): dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: dict: dict with translated metadata according to the CodeMeta vocabulary """ translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { INDEXER_CFG_KEY: self.idx_storage, 'objstorage': self.objstorage } for context in detected_files.keys(): tool['configuration']['context'] = context c_metadata_indexer = self.ContentMetadataIndexer(tool, config) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # schedule indexation of content try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups') # on the fly possibility: results = c_metadata_indexer.get_results() for result in results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception as e: - self.log.warn("""Exception while indexing content""", e) + self.log.warning("""Exception while indexing content""", e) # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return min_metadata class OriginMetadataIndexer(OriginIndexer): def filter(self, ids): return ids - def run(self, revisions_metadata, policy_update, *, origin_head_pairs): + def run(self, revisions_metadata, policy_update, *, origin_head): """Expected to be called with the result of RevisionMetadataIndexer as first argument; ie. not a list of ids as other indexers would. Args: * `revisions_metadata` (List[dict]): contains metadata from revisions, along with the respective revision ids. It is passed by RevisionMetadataIndexer via a Celery chain triggered by OriginIndexer.next_step. * `policy_update`: `'ignore-dups'` or `'update-dups'` - * `origin_head_pairs` (List[dict]): list of dictionaries with + * `origin_head` (dict): {str(origin_id): rev_id.encode()} keys `origin_id` and `revision_id`, which is the result of OriginHeadIndexer. """ - origin_head_map = {pair['origin_id']: pair['revision_id'] - for pair in origin_head_pairs} + origin_head_map = {int(origin_id): rev_id + for (origin_id, rev_id) in origin_head.items()} # Fix up the argument order. revisions_metadata has to be the # first argument because of celery.chain; the next line calls # run() with the usual order, ie. origin ids first. return super().run(ids=list(origin_head_map), policy_update=policy_update, revisions_metadata=revisions_metadata, origin_head_map=origin_head_map) def index(self, origin, *, revisions_metadata, origin_head_map): # Get the last revision of the origin. revision_id = origin_head_map[origin['id']] # Get the metadata of that revision, and return it for revision_metadata in revisions_metadata: if revision_metadata['id'] == revision_id: return { 'origin_id': origin['id'], 'metadata': revision_metadata['translated_metadata'], 'from_revision': revision_id, 'indexer_configuration_id': revision_metadata['indexer_configuration_id'], } - # If you get this KeyError with a message like this: - # 'foo' not in [b'foo'] - # you should check you're not using JSON as task serializer raise KeyError('%r not in %r' % (revision_id, [r['id'] for r in revisions_metadata])) def persist_index_computations(self, results, policy_update): self.idx_storage.origin_intrinsic_metadata_add( results, conflict_update=(policy_update == 'update-dups')) @click.command() @click.option('--revs', '-i', help='Default sha1_git to lookup', multiple=True) def main(revs): _git_sha1s = list(map(hashutil.hash_to_bytes, revs)) rev_metadata_indexer = RevisionMetadataIndexer() rev_metadata_indexer.run(_git_sha1s, 'update-dups') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index 858e75a..1b42419 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,158 +1,167 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import magic from swh.model import hashutil -from swh.scheduler import utils +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict from .indexer import ContentIndexer def compute_mimetype_encoding(raw_content): """Determine mimetype and encoding from the raw content. Args: raw_content (bytes): content's raw data Returns: A dict with mimetype and encoding key and corresponding values (as bytes). """ r = magic.detect_from_content(raw_content) return { 'mimetype': r.mime_type.encode('utf-8'), 'encoding': r.encoding.encode('utf-8'), } class ContentMimetypeIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { + 'scheduler': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5008', + }, + }, 'destination_task': ('str', None), 'tools': ('dict', { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" }, }), } CONFIG_BASE_FILENAME = 'indexer/mimetype' def prepare(self): super().prepare() - destination_task = self.config.get('destination_task') - if destination_task: - self.destination_task = utils.get_task(destination_task) - else: - self.destination_task = None + self.destination_task = self.config.get('destination_task') + self.scheduler = get_scheduler(**self.config['scheduler']) self.tool = self.tools[0] def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_mimetype_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ try: properties = compute_mimetype_encoding(data) properties.update({ 'id': id, 'indexer_configuration_id': self.tool['id'], }) except TypeError: self.log.error('Detecting mimetype error for id %s' % ( hashutil.hash_to_hex(id), )) return None return properties def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) def _filter_text(self, results): """Filter sha1 whose raw content is text. """ for result in results: if b'binary' in result['encoding']: continue yield result['id'] def next_step(self, results): """When the computations is done, we'd like to send over only text contents to the text content orchestrator. Args: results ([dict]): List of content_mimetype results, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ if self.destination_task: - self.destination_task.delay(list(self._filter_text(results))) + assert self.scheduler + self.scheduler.create_tasks([create_task_dict( + self.destination_task, + 'oneshot', + list(self._filter_text(results)) + )]) @click.command() @click.option('--path', help="Path to execute index on") def main(path): with open(path, 'rb') as f: raw_content = f.read() print(compute_mimetype_encoding(raw_content)) if __name__ == '__main__': main() diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index ea12525..d63c696 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,133 +1,143 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random -from celery import group - from swh.core.config import SWHConfig from swh.core.utils import grouper from swh.scheduler import utils +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict def get_class(clazz): """Get a symbol class dynamically by its fully qualified name string representation. """ parts = clazz.split('.') module = '.'.join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of dispatching batch of contents (filtered or not based on presence) to indexers. That dispatch is indexer specific, so the configuration reflects it: - when `check_presence` flag is true, filter out the contents already present for that indexer, otherwise send everything - broadcast those (filtered or not) contents to indexers in a `batch_size` fashioned For example:: indexers: mimetype: batch_size: 10 check_presence: false language: batch_size: 2 check_presence: true means: - send all contents received as batch of size 10 to the 'mimetype' indexer - send only unknown contents as batch of size 2 to the 'language' indexer. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' # Overridable in child classes. from . import TASK_NAMES, INDEXER_CLASSES DEFAULT_CONFIG = { + 'scheduler': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5008', + }, + }, 'indexers': ('dict', { 'mimetype': { 'batch_size': 10, 'check_presence': True, }, }), } - def prepare(self): - super().prepare() + def __init__(self): + super().__init__() + self.config = self.parse_config_file() self.prepare_tasks() + self.prepare_scheduler() + + def prepare_scheduler(self): + self.scheduler = get_scheduler(**self.config['scheduler']) def prepare_tasks(self): indexer_names = list(self.config['indexers']) random.shuffle(indexer_names) indexers = {} tasks = {} for name in indexer_names: if name not in self.TASK_NAMES: raise ValueError('%s must be one of %s' % ( name, ', '.join(self.TASK_NAMES))) opts = self.config['indexers'][name] indexers[name] = ( self.INDEXER_CLASSES[name], opts['check_presence'], opts['batch_size']) tasks[name] = utils.get_task(self.TASK_NAMES[name]) self.indexers = indexers self.tasks = tasks def run(self, ids): - all_results = [] - for name, (idx_class, filtering, batch_size) in self.indexers.items(): + for task_name, task_attrs in self.indexers.items(): + (idx_class, filtering, batch_size) = task_attrs if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) ids_filtered = list(indexer_class().filter(ids)) if not ids_filtered: continue else: policy_update = 'update-dups' ids_filtered = ids - celery_tasks = [] + tasks = [] for ids_to_send in grouper(ids_filtered, batch_size): - celery_task = self.tasks[name].s( + tasks.append(create_task_dict( + task_name, + 'oneshot', ids=list(ids_to_send), - policy_update=policy_update) - celery_tasks.append(celery_task) - - all_results.append(self._run_tasks(celery_tasks)) - - return all_results + policy_update=policy_update, + )) + self._create_tasks(tasks) - def _run_tasks(self, celery_tasks): - return group(celery_tasks).delay() + def _create_tasks(self, tasks): + self.scheduler.create_tasks(tasks) class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py index e21d531..9de1aa0 100644 --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -1,195 +1,217 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import click import logging -from celery import chain - +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict from swh.indexer.indexer import OriginIndexer -from swh.indexer.tasks import OriginMetadata, RevisionMetadata class OriginHeadIndexer(OriginIndexer): """Origin-level indexer. This indexer is in charge of looking up the revision that acts as the "head" of an origin. In git, this is usually the commit pointed to by the 'master' branch.""" ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {}, }), } CONFIG_BASE_FILENAME = 'indexer/origin_head' - revision_metadata_task = RevisionMetadata() - origin_intrinsic_metadata_task = OriginMetadata() + revision_metadata_task = 'revision_metadata' + origin_intrinsic_metadata_task = 'origin_metadata' def filter(self, ids): yield from ids def persist_index_computations(self, results, policy_update): """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer via the orchestrator.""" pass - def next_step(self, results): + def next_step(self, results, task): """Once the head is found, call the RevisionMetadataIndexer on these revisions, then call the OriginMetadataIndexer with both the origin_id and the revision metadata, so it can copy the revision metadata to the origin's metadata. Args: results (Iterable[dict]): Iterable of return values from `index`. """ + super().next_step(results, task) if self.revision_metadata_task is None and \ self.origin_intrinsic_metadata_task is None: return assert self.revision_metadata_task is not None assert self.origin_intrinsic_metadata_task is not None - return chain( - self.revision_metadata_task.s( - ids=[res['revision_id'] for res in results], - policy_update='update-dups'), - self.origin_intrinsic_metadata_task.s( - origin_head_pairs=results, - policy_update='update-dups'), - )() + + # Second task to run after this one: copy the revision's metadata + # to the origin + sub_task = create_task_dict( + self.origin_intrinsic_metadata_task, + 'oneshot', + origin_head={ + str(result['origin_id']): + result['revision_id'].decode() + for result in results}, + policy_update='update-dups', + ) + del sub_task['next_run'] # Not json-serializable + + # First task to run after this one: index the metadata of the + # revision + task = create_task_dict( + self.revision_metadata_task, + 'oneshot', + ids=[res['revision_id'].decode() for res in results], + policy_update='update-dups', + next_step={ + **sub_task, + 'result_name': 'revisions_metadata'}, + ) + if getattr(self, 'scheduler', None): + scheduler = self.scheduler + else: + scheduler = get_scheduler(**self.config['scheduler']) + scheduler.create_tasks([task]) # Dispatch def index(self, origin): origin_id = origin['id'] latest_snapshot = self.storage.snapshot_get_latest(origin_id) method = getattr(self, '_try_get_%s_head' % origin['type'], None) if method is None: method = self._try_get_head_generic rev_id = method(latest_snapshot) if rev_id is None: return None result = { 'origin_id': origin_id, 'revision_id': rev_id, } return result # VCSs def _try_get_vcs_head(self, snapshot): try: if isinstance(snapshot, dict): branches = snapshot['branches'] if branches[b'HEAD']['target_type'] == 'revision': return branches[b'HEAD']['target'] except KeyError: return None _try_get_hg_head = _try_get_git_head = _try_get_vcs_head # Tarballs _archive_filename_re = re.compile( rb'^' rb'(?P.*)[-_]' rb'(?P[0-9]+(\.[0-9])*)' rb'(?P[-+][a-zA-Z0-9.~]+?)?' rb'(?P(\.[a-zA-Z0-9]+)+)' rb'$') @classmethod def _parse_version(cls, filename): """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software >>> OriginHeadIndexer._parse_version(b'foo') (-inf,) >>> OriginHeadIndexer._parse_version(b'foo.tar.gz') (-inf,) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1.tar.gz') (0, 0, 1, 0) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') (0, 0, 1, -1, 'beta2') >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') (0, 0, 1, 1, 'foobar') """ res = cls._archive_filename_re.match(filename) if res is None: return (float('-infinity'),) version = [int(n) for n in res.group('version').decode().split('.')] if res.group('preversion') is None: version.append(0) else: preversion = res.group('preversion').decode() if preversion.startswith('-'): version.append(-1) version.append(preversion[1:]) elif preversion.startswith('+'): version.append(1) version.append(preversion[1:]) else: assert False, res.group('preversion') return tuple(version) def _try_get_ftp_head(self, snapshot): archive_names = list(snapshot['branches']) max_archive_name = max(archive_names, key=self._parse_version) r = self._try_resolve_target(snapshot['branches'], max_archive_name) return r # Generic def _try_get_head_generic(self, snapshot): # Works on 'deposit', 'svn', and 'pypi'. try: if isinstance(snapshot, dict): branches = snapshot['branches'] except KeyError: return None else: return ( self._try_resolve_target(branches, b'HEAD') or self._try_resolve_target(branches, b'master') ) def _try_resolve_target(self, branches, target_name): try: target = branches[target_name] while target['target_type'] == 'alias': target = branches[target['target']] if target['target_type'] == 'revision': return target['target'] elif target['target_type'] == 'content': return None # TODO elif target['target_type'] == 'directory': return None # TODO elif target['target_type'] == 'release': return None # TODO else: assert False except KeyError: return None @click.command() @click.option('--origins', '-i', help='Origins to lookup, in the "type+url" format', multiple=True) def main(origins): rev_metadata_indexer = OriginHeadIndexer() rev_metadata_indexer.run(origins, 'update-dups', parse_ids=True) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py index d3bfcbf..d2697e0 100644 --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,189 +1,172 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import itertools from collections import defaultdict from swh.core import utils from swh.core.config import SWHConfig from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.storage import get_storage -from swh.scheduler.utils import get_task class RecomputeChecksums(SWHConfig): """Class in charge of (re)computing content's hashes. Hashes to compute are defined across 2 configuration options: compute_checksums ([str]) list of hash algorithms that py:func:`swh.model.hashutil.MultiHash.from_data` function should be able to deal with. For variable-length checksums, a desired checksum length should also be provided. Their format is : e.g: blake2:512 recompute_checksums (bool) a boolean to notify that we also want to recompute potential existing hashes specified in compute_checksums. Default to False. """ DEFAULT_CONFIG = { # The storage to read from or update metadata to 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), # The objstorage to read contents' data from 'objstorage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }), # the set of checksums that should be computed. # Examples: 'sha1_git', 'blake2b512', 'blake2s256' 'compute_checksums': ( 'list[str]', []), # whether checksums that already exist in the DB should be # recomputed/updated or left untouched 'recompute_checksums': ('bool', False), # Number of contents to retrieve blobs at the same time 'batch_size_retrieve_content': ('int', 10), # Number of contents to update at the same time 'batch_size_update': ('int', 100), - # Rescheduling task on error (if None, nothing is done) - 'rescheduling_task': ('str', None), } CONFIG_BASE_FILENAME = 'indexer/rehash' def __init__(self): self.config = self.parse_config_file() self.storage = get_storage(**self.config['storage']) self.objstorage = get_objstorage(**self.config['objstorage']) self.compute_checksums = self.config['compute_checksums'] self.recompute_checksums = self.config[ 'recompute_checksums'] self.batch_size_retrieve_content = self.config[ 'batch_size_retrieve_content'] self.batch_size_update = self.config[ 'batch_size_update'] self.log = logging.getLogger('swh.indexer.rehash') - rescheduling_task = self.config['rescheduling_task'] - if rescheduling_task: - self.rescheduling_task = get_task(rescheduling_task) - else: - self.rescheduling_task = None - if not self.compute_checksums: raise ValueError('Checksums list should not be empty.') def _read_content_ids(self, contents): """Read the content identifiers from the contents. """ for c in contents: h = c['sha1'] if isinstance(h, str): h = hashutil.hash_to_bytes(h) yield h def get_new_contents_metadata(self, all_contents): """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: all_contents ([dict]): List of contents as dictionary with the necessary primary keys checksum_algorithms ([str]): List of checksums to compute Yields: tuple of: content to update, list of checksums computed """ content_ids = self._read_content_ids(all_contents) for contents in utils.grouper(content_ids, self.batch_size_retrieve_content): contents_iter = itertools.tee(contents, 2) try: content_metadata = self.storage.content_get_metadata( [s for s in contents_iter[0]]) except Exception: self.log.exception( 'Problem when reading contents metadata.') - if self.rescheduling_task: - self.log.warn('Rescheduling batch.') - cs = [{'sha1': sha1} for sha1 in contents_iter[1]] - self.rescheduling_task.delay(cs) continue for content in content_metadata: if self.recompute_checksums: # Recompute checksums provided # in compute_checksums options checksums_to_compute = list(self.compute_checksums) else: # Compute checksums provided in compute_checksums # options not already defined for that content checksums_to_compute = [h for h in self.compute_checksums if not content.get(h)] if not checksums_to_compute: # Nothing to recompute continue try: raw_content = self.objstorage.get(content['sha1']) except ObjNotFoundError: - self.log.warn('Content %s not found in objstorage!' % - content['sha1']) + self.log.warning('Content %s not found in objstorage!' % + content['sha1']) continue content_hashes = hashutil.MultiHash.from_data( raw_content, hash_names=checksums_to_compute).digest() content.update(content_hashes) yield content, checksums_to_compute def run(self, contents): """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata Args: contents (dict): contents as dictionary with necessary keys. key present in such dictionary should be the ones defined in the 'primary_key' option. """ for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update): groups = defaultdict(list) for content, keys_to_update in data: keys = ','.join(keys_to_update) groups[keys].append(content) for keys_to_update, contents in groups.items(): keys = keys_to_update.split(',') try: self.storage.content_update(contents, keys=keys) except Exception: self.log.exception('Problem during update.') - if self.rescheduling_task: - self.log.warn('Rescheduling batch.') - cs = [{'sha1': c['sha1']} for c in contents] - self.rescheduling_task.delay(cs) continue diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index fb136a3..2c8d4a4 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,105 +1,107 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.scheduler.task import Task as SchedulerTask from .orchestrator import OrchestratorAllContentsIndexer from .orchestrator import OrchestratorTextContentsIndexer from .mimetype import ContentMimetypeIndexer from .language import ContentLanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer from .rehash import RecomputeChecksums from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer logging.basicConfig(level=logging.INFO) class Task(SchedulerTask): def run_task(self, *args, **kwargs): indexer = self.Indexer().run(*args, **kwargs) - return indexer.results + if hasattr(indexer, 'results'): # indexer tasks + return indexer.results + return indexer class OrchestratorAllContents(Task): """Main task in charge of reading batch contents (of any type) and broadcasting them back to other tasks. """ task_queue = 'swh_indexer_orchestrator_content_all' Indexer = OrchestratorAllContentsIndexer class OrchestratorTextContents(Task): """Main task in charge of reading batch contents (of type text) and broadcasting them back to other tasks. """ task_queue = 'swh_indexer_orchestrator_content_text' Indexer = OrchestratorTextContentsIndexer class RevisionMetadata(Task): task_queue = 'swh_indexer_revision_metadata' serializer = 'msgpack' Indexer = RevisionMetadataIndexer class OriginMetadata(Task): task_queue = 'swh_indexer_origin_intrinsic_metadata' Indexer = OriginMetadataIndexer class ContentMimetype(Task): """Task which computes the mimetype, encoding from the sha1's content. """ task_queue = 'swh_indexer_content_mimetype' Indexer = ContentMimetypeIndexer class ContentLanguage(Task): """Task which computes the language from the sha1's content. """ task_queue = 'swh_indexer_content_language' def run_task(self, *args, **kwargs): ContentLanguageIndexer().run(*args, **kwargs) class Ctags(Task): """Task which computes ctags from the sha1's content. """ task_queue = 'swh_indexer_content_ctags' Indexer = CtagsIndexer class ContentFossologyLicense(Task): """Task which computes licenses from the sha1's content. """ task_queue = 'swh_indexer_content_fossology_license' Indexer = ContentFossologyLicenseIndexer class RecomputeChecksums(Task): """Task which recomputes hashes and possibly new ones. """ task_queue = 'swh_indexer_content_rehash' Indexer = RecomputeChecksums diff --git a/swh/indexer/tests/test_language.py b/swh/indexer/tests/test_language.py index 0c50636..166cc46 100644 --- a/swh/indexer/tests/test_language.py +++ b/swh/indexer/tests/test_language.py @@ -1,109 +1,107 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import logging from swh.indexer import language from swh.indexer.language import ContentLanguageIndexer from swh.indexer.tests.test_utils import MockObjStorage class _MockIndexerStorage(): """Mock storage to simplify reading indexers' outputs. """ def content_language_add(self, languages, conflict_update=None): self.state = languages self.conflict_update = conflict_update def indexer_configuration_add(self, tools): return [{ 'id': 20, }] class TestLanguageIndexer(ContentLanguageIndexer): """Specific language whose configuration is enough to satisfy the indexing tests. """ def prepare(self): self.config = { 'destination_task': None, - 'rescheduling_task': None, 'tools': { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', 'configuration': { 'type': 'library', 'debian-package': 'python3-pygments', 'max_content_size': 10240, }, } } self.idx_storage = _MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None - self.rescheduling_task = self.config['rescheduling_task'] self.tool_config = self.config['tools']['configuration'] self.max_content_size = self.tool_config['max_content_size'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] class Language(unittest.TestCase): """ Tests pygments tool for language detection """ def setUp(self): self.maxDiff = None def test_compute_language_none(self): # given self.content = "" self.declared_language = { 'lang': None } # when result = language.compute_language(self.content) # then self.assertEqual(self.declared_language, result) def test_index_content_language_python(self): # given # testing python sha1s = ['02fb2c89e14f7fab46701478c83779c7beb7b069'] lang_indexer = TestLanguageIndexer() # when lang_indexer.run(sha1s, policy_update='ignore-dups') results = lang_indexer.idx_storage.state expected_results = [{ 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069', 'indexer_configuration_id': 20, 'lang': 'python' }] # then self.assertEqual(expected_results, results) def test_index_content_language_c(self): # given # testing c sha1s = ['103bc087db1d26afc3a0283f38663d081e9b01e6'] lang_indexer = TestLanguageIndexer() # when lang_indexer.run(sha1s, policy_update='ignore-dups') results = lang_indexer.idx_storage.state expected_results = [{ 'id': '103bc087db1d26afc3a0283f38663d081e9b01e6', 'indexer_configuration_id': 20, 'lang': 'c' }] # then self.assertEqual('c', results[0]['lang']) self.assertEqual(expected_results, results) diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 56fffc3..6951af9 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,384 +1,378 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import logging from swh.indexer.metadata_dictionary import CROSSWALK_TABLE, MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.metadata import ContentMetadataIndexer from swh.indexer.metadata import RevisionMetadataIndexer from swh.indexer.tests.test_utils import MockObjStorage, MockStorage from swh.indexer.tests.test_utils import MockIndexerStorage class TestContentMetadataIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def prepare(self): - self.config.update({ - 'rescheduling_task': None, - }) self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None - self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] class TestRevisionMetadataIndexer(RevisionMetadataIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ ContentMetadataIndexer = TestContentMetadataIndexer def prepare(self): self.config = { - 'rescheduling_task': None, 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:9999', } }, 'tools': { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': 'NpmMapping' } } } self.storage = MockStorage() self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None - self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] class Metadata(unittest.TestCase): """ Tests metadata_mock_tool tool for Metadata detection """ def setUp(self): """ shows the entire diff in the results """ self.maxDiff = None self.content_tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': 'NpmMapping' } } MockIndexerStorage.added_data = [] def test_crosstable(self): self.assertEqual(CROSSWALK_TABLE['NodeJS'], { 'repository': 'codeRepository', 'os': 'operatingSystem', 'cpu': 'processorRequirements', 'engines': 'processorRequirements', 'dependencies': 'softwareRequirements', 'bundleDependencies': 'softwareRequirements', 'bundledDependencies': 'softwareRequirements', 'peerDependencies': 'softwareRequirements', 'author': 'creator', 'author.email': 'email', 'author.name': 'name', 'contributor': 'contributor', 'keywords': 'keywords', 'license': 'license', 'version': 'version', 'description': 'description', 'name': 'name', 'devDependencies': 'softwareSuggestions', 'optionalDependencies': 'softwareSuggestions', 'bugs': 'issueTracker', 'homepage': 'url' }) def test_compute_metadata_none(self): """ testing content empty content is empty should return None """ # given content = b"" # None if no metadata was found or an error occurred declared_metadata = None # when result = MAPPINGS["NpmMapping"].translate(content) # then self.assertEqual(declared_metadata, result) def test_compute_metadata_npm(self): """ testing only computation of metadata with hard_mapping_npm """ # given content = b""" { "name": "test_metadata", "version": "0.0.2", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """ declared_metadata = { 'name': 'test_metadata', 'version': '0.0.2', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} } # when result = MAPPINGS["NpmMapping"].translate(content) # then self.assertEqual(declared_metadata, result) def test_extract_minimal_metadata_dict(self): """ Test the creation of a coherent minimal metadata set """ # given metadata_list = [{ 'name': 'test_1', 'version': '0.0.2', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} }, { 'name': 'test_0_1', 'version': '0.0.2', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} }, { 'name': 'test_metadata', 'version': '0.0.2', 'author': 'moranegg', 'other': {} }] # when results = extract_minimal_metadata_dict(metadata_list) # then expected_results = { "developmentStatus": None, "version": ['0.0.2'], "operatingSystem": None, "description": ['Simple package.json test for indexer'], "keywords": None, "issueTracker": None, "name": ['test_1', 'test_0_1', 'test_metadata'], "author": ['moranegg'], "relatedLink": None, "url": None, "license": None, "maintainer": None, "email": None, "softwareRequirements": None, "identifier": None, "codeRepository": [{ 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }] } self.assertEqual(expected_results, results) def test_index_content_metadata_npm(self): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and should return None in the translated metadata """ # given sha1s = ['26a9f72a7c87cc9205725cfd879f514ff4f3d8d5', 'd4c647f0fc257591cc9ba1722484229780d1c607', '02fb2c89e14f7fab46701478c83779c7beb7b069'] # this metadata indexer computes only metadata for package.json # in npm context with a hard mapping metadata_indexer = TestContentMetadataIndexer( tool=self.content_tool, config={}) # when metadata_indexer.run(sha1s, policy_update='ignore-dups') results = metadata_indexer.idx_storage.added_data expected_results = [('content_metadata', False, [{ 'indexer_configuration_id': 30, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'id': '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5' }, { 'indexer_configuration_id': 30, 'translated_metadata': { 'softwareRequirements': { 'JSONStream': '~1.3.1', 'abbrev': '~1.1.0', 'ansi-regex': '~2.1.1', 'ansicolors': '~0.3.2', 'ansistyles': '~0.1.3' }, 'issueTracker': { 'url': 'https://github.com/npm/npm/issues' }, 'creator': 'Isaac Z. Schlueter (http://blog.izs.me)', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/npm/npm' }, 'description': 'a package manager for JavaScript', 'softwareSuggestions': { 'tacks': '~1.2.6', 'tap': '~10.3.2' }, 'license': 'Artistic-2.0', 'version': '5.0.3', 'other': { 'preferGlobal': True, 'config': { 'publishtest': False } }, 'name': 'npm', 'keywords': [ 'install', 'modules', 'package manager', 'package.json' ], 'url': 'https://docs.npmjs.com/' }, 'id': 'd4c647f0fc257591cc9ba1722484229780d1c607' }, { 'indexer_configuration_id': 30, 'translated_metadata': None, 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069' }])] # The assertion below returns False sometimes because of nested lists self.assertEqual(expected_results, results) def test_detect_metadata_package_json(self): # given df = [{ 'sha1_git': b'abc', 'name': b'index.js', 'target': b'abc', 'length': 897, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'dir_a', 'sha1': b'bcd' }, { 'sha1_git': b'aab', 'name': b'package.json', 'target': b'aab', 'length': 712, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'dir_a', 'sha1': b'cde' }] # when results = detect_metadata(df) expected_results = { 'NpmMapping': [ b'cde' ] } # then self.assertEqual(expected_results, results) def test_revision_metadata_indexer(self): metadata_indexer = TestRevisionMetadataIndexer() sha1_gits = [ b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', ] metadata_indexer.run(sha1_gits, 'update-dups') results = metadata_indexer.idx_storage.added_data expected_results = [('revision_metadata', True, [{ - 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': { 'identifier': None, 'maintainer': None, 'url': [ 'https://github.com/librariesio/yarn-parser#readme' ], 'codeRepository': [{ 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }], 'author': ['Andrew Nesbitt'], 'license': ['AGPL-3.0'], 'version': ['1.0.0'], 'description': [ 'Tiny web service for parsing yarn.lock files' ], 'relatedLink': None, 'developmentStatus': None, 'operatingSystem': None, 'issueTracker': [{ 'url': 'https://github.com/librariesio/yarn-parser/issues' }], 'softwareRequirements': [{ 'express': '^4.14.0', 'yarn': '^0.21.0', 'body-parser': '^1.15.2' }], 'name': ['yarn-parser'], 'keywords': [['yarn', 'parse', 'lock', 'dependencies']], 'email': None }, 'indexer_configuration_id': 7 }])] # then self.assertEqual(expected_results, results) diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py index 2082815..4632bcb 100644 --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -1,153 +1,150 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import logging from swh.indexer.mimetype import ContentMimetypeIndexer from swh.indexer.tests.test_utils import MockObjStorage class _MockIndexerStorage(): """Mock storage to simplify reading indexers' outputs. """ def content_mimetype_add(self, mimetypes, conflict_update=None): self.state = mimetypes self.conflict_update = conflict_update def indexer_configuration_add(self, tools): return [{ 'id': 10, }] class TestMimetypeIndexer(ContentMimetypeIndexer): """Specific mimetype whose configuration is enough to satisfy the indexing tests. """ def prepare(self): self.config = { 'destination_task': None, - 'rescheduling_task': None, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" }, }, } self.idx_storage = _MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() - self.destination_task = None - self.rescheduling_task = self.config['rescheduling_task'] self.destination_task = self.config['destination_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] class TestMimetypeIndexerUnknownToolStorage(TestMimetypeIndexer): """Specific mimetype whose configuration is not enough to satisfy the indexing tests. """ def prepare(self): super().prepare() self.tools = None class TestMimetypeIndexerWithErrors(unittest.TestCase): def test_wrong_unknown_configuration_tool(self): """Indexer with unknown configuration tool should fail the check""" with self.assertRaisesRegex(ValueError, 'Tools None is unknown'): TestMimetypeIndexerUnknownToolStorage() class TestMimetypeIndexerTest(unittest.TestCase): def setUp(self): self.indexer = TestMimetypeIndexer() def test_index_no_update(self): # given sha1s = [ '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', '688a5ef812c53907562fe379d4b3851e69c7cb15', ] # when self.indexer.run(sha1s, policy_update='ignore-dups') # then expected_results = [{ 'id': '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', 'indexer_configuration_id': 10, 'mimetype': b'text/plain', 'encoding': b'us-ascii', }, { 'id': '688a5ef812c53907562fe379d4b3851e69c7cb15', 'indexer_configuration_id': 10, 'mimetype': b'text/plain', 'encoding': b'us-ascii', }] self.assertFalse(self.indexer.idx_storage.conflict_update) self.assertEqual(expected_results, self.indexer.idx_storage.state) def test_index_update(self): # given sha1s = [ '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', '688a5ef812c53907562fe379d4b3851e69c7cb15', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', # empty content ] # when self.indexer.run(sha1s, policy_update='update-dups') # then expected_results = [{ 'id': '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', 'indexer_configuration_id': 10, 'mimetype': b'text/plain', 'encoding': b'us-ascii', }, { 'id': '688a5ef812c53907562fe379d4b3851e69c7cb15', 'indexer_configuration_id': 10, 'mimetype': b'text/plain', 'encoding': b'us-ascii', }, { 'id': 'da39a3ee5e6b4b0d3255bfef95601890afd80709', 'indexer_configuration_id': 10, 'mimetype': b'application/x-empty', 'encoding': b'binary', }] self.assertTrue(self.indexer.idx_storage.conflict_update) self.assertEqual(expected_results, self.indexer.idx_storage.state) def test_index_one_unknown_sha1(self): # given sha1s = ['688a5ef812c53907562fe379d4b3851e69c7cb15', '799a5ef812c53907562fe379d4b3851e69c7cb15', # unknown '800a5ef812c53907562fe379d4b3851e69c7cb15'] # unknown # when self.indexer.run(sha1s, policy_update='update-dups') # then expected_results = [{ 'id': '688a5ef812c53907562fe379d4b3851e69c7cb15', 'indexer_configuration_id': 10, 'mimetype': b'text/plain', 'encoding': b'us-ascii', }] self.assertTrue(self.indexer.idx_storage.conflict_update) self.assertEqual(expected_results, self.indexer.idx_storage.state) diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py index c9804e3..0fa2da9 100644 --- a/swh/indexer/tests/test_orchestrator.py +++ b/swh/indexer/tests/test_orchestrator.py @@ -1,199 +1,210 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import celery from swh.indexer.orchestrator import BaseOrchestratorIndexer from swh.indexer.indexer import BaseIndexer from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage -from swh.scheduler.tests.celery_testing import CeleryTestFixture -from swh.indexer.tests import start_worker_thread +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture class BaseTestIndexer(BaseIndexer): ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'foo', 'version': 'bar', 'configuration': {} }), } def prepare(self): self.idx_storage = MockIndexerStorage() self.storage = MockStorage() def check(self): pass def filter(self, ids): self.filtered.append(ids) return ids def run(self, ids, policy_update): return self.index(ids) def index(self, ids): self.indexed.append(ids) return [id_ + '_indexed_by_' + self.__class__.__name__ for id_ in ids] def persist_index_computations(self, result, policy_update): self.persisted = result class Indexer1(BaseTestIndexer): filtered = [] indexed = [] def filter(self, ids): return super().filter([id_ for id_ in ids if '1' in id_]) class Indexer2(BaseTestIndexer): filtered = [] indexed = [] def filter(self, ids): return super().filter([id_ for id_ in ids if '2' in id_]) class Indexer3(BaseTestIndexer): filtered = [] indexed = [] def filter(self, ids): return super().filter([id_ for id_ in ids if '3' in id_]) @celery.task def indexer1_task(*args, **kwargs): return Indexer1().run(*args, **kwargs) @celery.task def indexer2_task(*args, **kwargs): return Indexer2().run(*args, **kwargs) @celery.task def indexer3_task(self, *args, **kwargs): return Indexer3().run(*args, **kwargs) class TestOrchestrator12(BaseOrchestratorIndexer): TASK_NAMES = { 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', } INDEXER_CLASSES = { 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', } def __init__(self): super().__init__() self.running_tasks = [] - def prepare(self): - self.config = { - 'indexers': { - 'indexer1': { - 'batch_size': 2, - 'check_presence': True, - }, - 'indexer2': { - 'batch_size': 2, - 'check_presence': True, - }, - } - } - self.prepare_tasks() + def parse_config_file(self): + return { + 'scheduler': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:9999', + }, + }, + 'indexers': { + 'indexer1': { + 'batch_size': 2, + 'check_presence': True, + }, + 'indexer2': { + 'batch_size': 2, + 'check_presence': True, + }, + } + } class MockedTestOrchestrator12(TestOrchestrator12): - def _run_tasks(self, celery_tasks): - self.running_tasks.extend(celery_tasks) + def __init__(self): + super().__init__() + self.created_tasks = [] + def _create_tasks(self, celery_tasks): + self.created_tasks.extend(celery_tasks) + + def prepare_scheduler(self): + pass + + +class OrchestratorTest(SchedulerTestFixture, unittest.TestCase): + def setUp(self): + super().setUp() + self.add_scheduler_task_type( + 'indexer1', + 'swh.indexer.tests.test_orchestrator.indexer1_task') + self.add_scheduler_task_type( + 'indexer2', + 'swh.indexer.tests.test_orchestrator.indexer2_task') -class OrchestratorTest(CeleryTestFixture, unittest.TestCase): def test_orchestrator_filter(self): - with start_worker_thread(): - o = TestOrchestrator12() - o.prepare() - promises = o.run(['id12', 'id2']) - results = [] - for promise in reversed(promises): - results.append(promise.get(timeout=10)) - self.assertCountEqual( - results, - [[['id12_indexed_by_Indexer1']], - [['id12_indexed_by_Indexer2', - 'id2_indexed_by_Indexer2']]]) - self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) - self.assertEqual(Indexer1.indexed, [['id12']]) + o = TestOrchestrator12() + o.scheduler = self.scheduler + o.run(['id12', 'id2']) + self.assertEqual(Indexer2.indexed, []) + self.assertEqual(Indexer1.indexed, []) + self.run_ready_tasks() + self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) + self.assertEqual(Indexer1.indexed, [['id12']]) class MockedOrchestratorTest(unittest.TestCase): maxDiff = None def test_mocked_orchestrator_filter(self): o = MockedTestOrchestrator12() - o.prepare() o.run(['id12', 'id2']) - self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + for task in o.created_tasks: + del task['next_run'] # not worth the trouble testing it properly + self.assertCountEqual(o.created_tasks, [ + {'type': 'indexer1', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12'], + 'policy_update': 'ignore-dups'}}, + 'policy': 'oneshot'}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12', 'id2'], + 'policy_update': 'ignore-dups'}}, + 'policy': 'oneshot'}, ]) def test_mocked_orchestrator_batch(self): o = MockedTestOrchestrator12() - o.prepare() o.run(['id12', 'id2a', 'id2b', 'id2c']) - self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + for task in o.created_tasks: + del task['next_run'] # not worth the trouble testing it properly + self.assertCountEqual(o.created_tasks, [ + {'type': 'indexer1', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12'], + 'policy_update': 'ignore-dups'}}, + 'policy': 'oneshot'}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12', 'id2a'], + 'policy_update': 'ignore-dups'}}, + 'policy': 'oneshot'}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id2b', 'id2c'], + 'policy_update': 'ignore-dups'}}, + 'policy': 'oneshot'}, ]) diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py index 84e218b..1b82b64 100644 --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -1,126 +1,142 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import time import logging import unittest from celery import task from swh.indexer.metadata import OriginMetadataIndexer from swh.indexer.tests.test_utils import MockObjStorage, MockStorage from swh.indexer.tests.test_utils import MockIndexerStorage from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer -from swh.scheduler.tests.celery_testing import CeleryTestFixture -from swh.indexer.tests import start_worker_thread +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture class TestOriginMetadataIndexer(OriginMetadataIndexer): def prepare(self): self.config = { 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:9999', } }, 'tools': { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {} } } self.storage = MockStorage() self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] @task def revision_metadata_test_task(*args, **kwargs): indexer = TestRevisionMetadataIndexer() indexer.run(*args, **kwargs) return indexer.results @task def origin_intrinsic_metadata_test_task(*args, **kwargs): indexer = TestOriginMetadataIndexer() indexer.run(*args, **kwargs) return indexer.results class TestOriginHeadIndexer(TestOriginHeadIndexer): - revision_metadata_task = revision_metadata_test_task - origin_intrinsic_metadata_task = origin_intrinsic_metadata_test_task + revision_metadata_task = 'revision_metadata_test_task' + origin_intrinsic_metadata_task = 'origin_intrinsic_metadata_test_task' -class TestOriginMetadata(CeleryTestFixture, unittest.TestCase): +class TestOriginMetadata(SchedulerTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.maxDiff = None MockIndexerStorage.added_data = [] + self.add_scheduler_task_type( + 'revision_metadata_test_task', + 'swh.indexer.tests.test_origin_metadata.' + 'revision_metadata_test_task') + self.add_scheduler_task_type( + 'origin_intrinsic_metadata_test_task', + 'swh.indexer.tests.test_origin_metadata.' + 'origin_intrinsic_metadata_test_task') + TestRevisionMetadataIndexer.scheduler = self.scheduler + + def tearDown(self): + del TestRevisionMetadataIndexer.scheduler + super().tearDown() def test_pipeline(self): indexer = TestOriginHeadIndexer() - with start_worker_thread(): - promise = indexer.run( - ["git+https://github.com/librariesio/yarn-parser"], - policy_update='update-dups', - parse_ids=True) - promise.get() + indexer.scheduler = self.scheduler + indexer.run( + ["git+https://github.com/librariesio/yarn-parser"], + policy_update='update-dups', + parse_ids=True) + + self.run_ready_tasks() # Run the first task + time.sleep(0.1) # Give it time to complete and schedule the 2nd one + self.run_ready_tasks() # Run the second task metadata = { 'identifier': None, 'maintainer': None, 'url': [ 'https://github.com/librariesio/yarn-parser#readme' ], 'codeRepository': [{ 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }], 'author': ['Andrew Nesbitt'], 'license': ['AGPL-3.0'], 'version': ['1.0.0'], 'description': [ 'Tiny web service for parsing yarn.lock files' ], 'relatedLink': None, 'developmentStatus': None, 'operatingSystem': None, 'issueTracker': [{ 'url': 'https://github.com/librariesio/yarn-parser/issues' }], 'softwareRequirements': [{ 'express': '^4.14.0', 'yarn': '^0.21.0', 'body-parser': '^1.15.2' }], 'name': ['yarn-parser'], 'keywords': [['yarn', 'parse', 'lock', 'dependencies']], 'email': None } rev_metadata = { - 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': metadata, 'indexer_configuration_id': 7, } origin_metadata = { 'origin_id': 54974445, - 'from_revision': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'from_revision': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'metadata': metadata, 'indexer_configuration_id': 7, } expected_results = [ ('origin_intrinsic_metadata', True, [origin_metadata]), ('revision_metadata', True, [rev_metadata])] results = list(indexer.idx_storage.added_data) self.assertCountEqual(expected_results, results) diff --git a/swh/indexer/tests/test_utils.py b/swh/indexer/tests/test_utils.py index 0c519b9..826a909 100644 --- a/swh/indexer/tests/test_utils.py +++ b/swh/indexer/tests/test_utils.py @@ -1,409 +1,410 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.objstorage.exc import ObjNotFoundError ORIGINS = [ { 'id': 52189575, 'lister': None, 'project': None, 'type': 'git', 'url': 'https://github.com/SoftwareHeritage/swh-storage'}, { 'id': 4423668, 'lister': None, 'project': None, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/3dldf'}, { 'id': 77775770, 'lister': None, 'project': None, 'type': 'deposit', 'url': 'https://forge.softwareheritage.org/source/jesuisgpl/'}, { 'id': 85072327, 'lister': None, 'project': None, 'type': 'pypi', 'url': 'https://pypi.org/project/limnoria/'}, { 'id': 49908349, 'lister': None, 'project': None, 'type': 'svn', 'url': 'http://0-512-md.googlecode.com/svn/'}, { 'id': 54974445, 'lister': None, 'project': None, 'type': 'git', 'url': 'https://github.com/librariesio/yarn-parser'}, ] SNAPSHOTS = { 52189575: { 'branches': { b'refs/heads/add-revision-origin-cache': { 'target': b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0' b's\xe7/\xe9l\x1e', 'target_type': 'revision'}, b'HEAD': { 'target': b'8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}' b'\xac\xefrm', 'target_type': 'revision'}, b'refs/tags/v0.0.103': { 'target': b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+' b'\x0f\xdd', 'target_type': 'release'}, }}, 4423668: { 'branches': { b'3DLDF-1.1.4.tar.gz': { 'target': b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc' b'"G\x99\x11', 'target_type': 'revision'}, b'3DLDF-2.0.2.tar.gz': { 'target': b'\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e=' b'\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V', 'target_type': 'revision'}, b'3DLDF-2.0.3-examples.tar.gz': { 'target': b'!H\x19\xc0\xee\x82-\x12F1\xbd\x97' b'\xfe\xadZ\x80\x80\xc1\x83\xff', 'target_type': 'revision'}, b'3DLDF-2.0.3.tar.gz': { 'target': b'\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee' b'\xcc\x1a\xb4`\x8c\x8by', 'target_type': 'revision'}, b'3DLDF-2.0.tar.gz': { 'target': b'F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G' b'\xd3\xd1m', b'target_type': 'revision'} }}, 77775770: { 'branches': { b'master': { 'target': b'\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{' b'\xa6\xe9\x99\xb1\x9e]q\xeb', 'target_type': 'revision'} }, 'id': b"h\xc0\xd2a\x04\xd4~'\x8d\xd6\xbe\x07\xeda\xfa\xfbV" b"\x1d\r "}, 85072327: { 'branches': { b'HEAD': { 'target': b'releases/2018.09.09', 'target_type': 'alias'}, b'releases/2018.09.01': { 'target': b'<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d' b'\xbb\xdfF\xfdw\xcf', 'target_type': 'revision'}, b'releases/2018.09.09': { 'target': b'\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k' b'A\x10\x9d\xc5\xfa2\xf8t', 'target_type': 'revision'}}, 'id': b'{\xda\x8e\x84\x7fX\xff\x92\x80^\x93V\x18\xa3\xfay' b'\x12\x9e\xd6\xb3'}, 49908349: { 'branches': { b'master': { 'target': b'\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8' b'\xc9\xad#.\x1bw=\x18', 'target_type': 'revision'}}, 'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7' b'\x05\xea\xb8\x1f\xc4H\xf4s'}, 54974445: { 'branches': { b'HEAD': { 'target': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'target_type': 'revision'}}} } class MockObjStorage: """Mock an swh-objstorage objstorage with predefined contents. """ data = {} def __init__(self): self.data = { '01c9379dfc33803963d07c1ccc748d3fe4c96bb5': b'this is some text', '688a5ef812c53907562fe379d4b3851e69c7cb15': b'another text', '8986af901dd2043044ce8f0d8fc039153641cf17': b'yet another text', '02fb2c89e14f7fab46701478c83779c7beb7b069': b""" import unittest import logging from swh.indexer.mimetype import ContentMimetypeIndexer from swh.indexer.tests.test_utils import MockObjStorage class MockStorage(): def content_mimetype_add(self, mimetypes): self.state = mimetypes self.conflict_update = conflict_update def indexer_configuration_add(self, tools): return [{ 'id': 10, }] """, '103bc087db1d26afc3a0283f38663d081e9b01e6': b""" #ifndef __AVL__ #define __AVL__ typedef struct _avl_tree avl_tree; typedef struct _data_t { int content; } data_t; """, '93666f74f1cf635c8c8ac118879da6ec5623c410': b""" (should 'pygments (recognize 'lisp 'easily)) """, '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5': b""" { "name": "test_metadata", "version": "0.0.1", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """, 'd4c647f0fc257591cc9ba1722484229780d1c607': b""" { "version": "5.0.3", "name": "npm", "description": "a package manager for JavaScript", "keywords": [ "install", "modules", "package manager", "package.json" ], "preferGlobal": true, "config": { "publishtest": false }, "homepage": "https://docs.npmjs.com/", "author": "Isaac Z. Schlueter (http://blog.izs.me)", "repository": { "type": "git", "url": "https://github.com/npm/npm" }, "bugs": { "url": "https://github.com/npm/npm/issues" }, "dependencies": { "JSONStream": "~1.3.1", "abbrev": "~1.1.0", "ansi-regex": "~2.1.1", "ansicolors": "~0.3.2", "ansistyles": "~0.1.3" }, "devDependencies": { "tacks": "~1.2.6", "tap": "~10.3.2" }, "license": "Artistic-2.0" } """, 'a7ab314d8a11d2c93e3dcf528ca294e7b431c449': b""" """, 'da39a3ee5e6b4b0d3255bfef95601890afd80709': b'', } def __iter__(self): yield from self.data.keys() def __contains__(self, sha1): return self.data.get(sha1) is not None def get(self, sha1): raw_content = self.data.get(sha1) if raw_content is None: raise ObjNotFoundError(sha1) return raw_content class MockIndexerStorage(): """Mock an swh-indexer storage. """ added_data = [] def indexer_configuration_add(self, tools): tool = tools[0] if tool['tool_name'] == 'swh-metadata-translator': return [{ 'id': 30, 'tool_name': 'swh-metadata-translator', 'tool_version': '0.0.1', 'tool_configuration': { 'type': 'local', 'context': 'NpmMapping' }, }] elif tool['tool_name'] == 'swh-metadata-detector': return [{ 'id': 7, 'tool_name': 'swh-metadata-detector', 'tool_version': '0.0.1', 'tool_configuration': { 'type': 'local', 'context': 'NpmMapping' }, }] elif tool['tool_name'] == 'origin-metadata': return [{ 'id': 8, 'tool_name': 'origin-metadata', 'tool_version': '0.0.1', 'tool_configuration': {}, }] else: assert False, 'Unknown tool {tool_name}'.format(**tool) def content_metadata_missing(self, sha1s): yield from [] def content_metadata_add(self, metadata, conflict_update=None): self.added_data.append( ('content_metadata', conflict_update, metadata)) def revision_metadata_add(self, metadata, conflict_update=None): self.added_data.append( ('revision_metadata', conflict_update, metadata)) def origin_intrinsic_metadata_add(self, metadata, conflict_update=None): self.added_data.append( ('origin_intrinsic_metadata', conflict_update, metadata)) def content_metadata_get(self, sha1s): return [{ 'tool': { 'configuration': { 'type': 'local', 'context': 'NpmMapping' }, 'version': '0.0.1', 'id': 6, 'name': 'swh-metadata-translator' }, 'id': b'cde', 'translated_metadata': { 'issueTracker': { 'url': 'https://github.com/librariesio/yarn-parser/issues' }, 'version': '1.0.0', 'name': 'yarn-parser', 'author': 'Andrew Nesbitt', 'url': 'https://github.com/librariesio/yarn-parser#readme', 'processorRequirements': {'node': '7.5'}, 'other': { 'scripts': { 'start': 'node index.js' }, 'main': 'index.js' }, 'license': 'AGPL-3.0', 'keywords': ['yarn', 'parse', 'lock', 'dependencies'], 'codeRepository': { 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }, 'description': 'Tiny web service for parsing yarn.lock files', 'softwareRequirements': { 'yarn': '^0.21.0', 'express': '^4.14.0', 'body-parser': '^1.15.2'} } }] class MockStorage(): """Mock a real swh-storage storage to simplify reading indexers' outputs. """ def origin_get(self, id_): for origin in ORIGINS: for (k, v) in id_.items(): if origin[k] != v: break else: - # This block is run iff we didn't break, ie. if all supplied - # parts of the id are set to the expected value. + # This block is run if and only if we didn't break, + # ie. if all supplied parts of the id are set to the + # expected value. return origin assert False, id_ def snapshot_get_latest(self, origin_id): if origin_id in SNAPSHOTS: return SNAPSHOTS[origin_id] else: assert False, origin_id def revision_get(self, revisions): return [{ 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'committer': { 'id': 26, 'name': b'Andrew Nesbitt', 'fullname': b'Andrew Nesbitt ', 'email': b'andrewnez@gmail.com' }, 'synthetic': False, 'date': { 'negative_utc': False, 'timestamp': { 'seconds': 1487596456, 'microseconds': 0 }, 'offset': 0 }, 'directory': b'10' }] def directory_ls(self, directory, recursive=False, cur=None): # with directory: b'\x9d', return [{ 'sha1_git': b'abc', 'name': b'index.js', 'target': b'abc', 'length': 897, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'10', 'sha1': b'bcd' }, { 'sha1_git': b'aab', 'name': b'package.json', 'target': b'aab', 'length': 712, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'10', 'sha1': b'cde' }, { 'dir_id': b'10', 'target': b'11', 'type': 'dir', 'length': None, 'name': b'.github', 'sha1': None, 'perms': 16384, 'sha1_git': None, 'status': None, 'sha256': None }] diff --git a/version.txt b/version.txt index 3003c66..3179a09 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.54-0-g4b44f3a \ No newline at end of file +v0.0.55-0-g178870d \ No newline at end of file