diff --git a/PKG-INFO b/PKG-INFO index 3d6878b..2bd931f 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.20 +Version: 0.0.21 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/data/elastic-template.json b/data/elastic-template.json index 4e06dfc..ed4b172 100644 --- a/data/elastic-template.json +++ b/data/elastic-template.json @@ -1,52 +1,51 @@ { "order": 0, "index_patterns": ["swh-tasks-*"], "settings": { "index": { - "number_of_shards": "1", "codec": "best_compression", "refresh_interval": "30s" } }, "mappings" : { "task" : { "_source" : { "enabled": true}, "properties": { "task_id": {"type": "double"}, "task_policy": {"type": "text"}, "task_status": {"type": "text"}, "task_run_id": {"type": "double"}, "arguments": { "type": "object", "properties" : { "args": { "type": "nested" }, "kwargs": { "type": "object" } } }, "type": {"type": "text"}, "backend_id": {"type": "text"}, "metadata": { "type": "object", "enabled" : false }, "scheduled": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" }, "started": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" }, "ended": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" } } } }, "aliases": {} } diff --git a/debian/changelog b/debian/changelog index 19106a9..1d8d51f 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,158 +1,160 @@ -swh-scheduler (0.0.20-1~swh1~bpo9+1) stretch-swh; urgency=medium +swh-scheduler (0.0.21-1~swh1) unstable-swh; urgency=medium - * Rebuild for stretch-backports. + * v0.0.21 + * Adapt default configuration + * Fix typo in configuration variable name - -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 + -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 15:02:55 +0200 swh-scheduler (0.0.20-1~swh1) unstable-swh; urgency=medium * v0.0.20 * swh.scheduler.cli.archive: Open completed oneshot or disabled * recurring tasks archival endpoint * swh.core.serializer: Move to msgpack serialization format * swh.scheduler.cli: Unify pretty print output * sql/data: Add new task type for loading mercurial dump * swh.scheduler.cli: Add sample use case for the scheduling cli * swh.scheduler.cli: Open policy column to the scheduling cli * swh.scheduler.cli: Open the delimiter option as cli argument * Fix issue when updating task-type without any retry delay defined * swh-scheduler/data: Add new oneshot scheduling load-mercurial task * backend: fix default scheduling_db value for consistency * backend: doc: fix return value of create_tasks -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 swh-scheduler (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * swh.scheduler.utils: Open utility function to create oneshot task -- Antoine R. Dumont (@ardumont) Wed, 29 Nov 2017 12:51:15 +0100 swh-scheduler (0.0.18-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.18 * Celery 4 compatibility -- Nicolas Dandrimont Wed, 08 Nov 2017 17:06:22 +0100 swh-scheduler (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.17 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 18:49:02 +0200 swh-scheduler (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.16 * add some tests * implement one-shot tasks * implement retry on temporary failure -- Nicolas Dandrimont Mon, 07 Aug 2017 18:44:03 +0200 swh-scheduler (0.0.15-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.15 * Add some methods to get the length of task queues * worker: Show logs on stdout if loglevel = debug -- Nicolas Dandrimont Mon, 19 Jun 2017 19:44:56 +0200 swh-scheduler (0.0.14-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler 0.0.14 * Make the return value of tasks available in the listener -- Nicolas Dandrimont Mon, 12 Jun 2017 17:50:32 +0200 swh-scheduler (0.0.13-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.13 * Use systemd for logging rather than PostgreSQL -- Nicolas Dandrimont Fri, 07 Apr 2017 11:57:50 +0200 swh-scheduler (0.0.12-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.12 * Only log to database if the configuration is present -- Nicolas Dandrimont Thu, 09 Mar 2017 11:12:45 +0100 swh-scheduler (0.0.11-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.11 * add utils.get_task -- Nicolas Dandrimont Tue, 14 Feb 2017 19:49:34 +0100 swh-scheduler (0.0.10-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.10 * Allow disabling tasks -- Nicolas Dandrimont Thu, 20 Oct 2016 17:20:17 +0200 swh-scheduler (0.0.9-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.9 * Revert management of one shot tasks * Add possibility of launching several worker instances -- Nicolas Dandrimont Fri, 02 Sep 2016 17:09:18 +0200 swh-scheduler (0.0.7-1~swh1) unstable-swh; urgency=medium * v0.0.7 * Add oneshot task -- Antoine R. Dumont (@ardumont) Fri, 01 Jul 2016 16:42:45 +0200 swh-scheduler (0.0.6-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.6 * More reliability and efficiency when scheduling a lot ot tasks -- Nicolas Dandrimont Wed, 24 Feb 2016 18:46:57 +0100 swh-scheduler (0.0.5-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.5 * Use copy for task mass-scheduling -- Nicolas Dandrimont Wed, 24 Feb 2016 12:13:38 +0100 swh-scheduler (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.4 * general cleanup of the backend * use arrow instead of dateutil * add new cli program -- Nicolas Dandrimont Tue, 23 Feb 2016 17:46:04 +0100 swh-scheduler (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.3 * Implement the timestamp arguments to the task_run functions * Make the celery event listener use a reliable queue -- Nicolas Dandrimont Mon, 22 Feb 2016 15:14:28 +0100 swh-scheduler (0.0.2-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.2 * Multiple schema changes * Initial releases for the celery job runner and the event listener -- Nicolas Dandrimont Fri, 19 Feb 2016 18:50:47 +0100 swh-scheduler (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Release swh.scheduler v0.0.1 * Move swh.core.scheduling and swh.core.worker to swh.scheduler -- Nicolas Dandrimont Mon, 15 Feb 2016 11:07:30 +0100 diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 3d6878b..2bd931f 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.20 +Version: 0.0.21 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py index e280e9f..2cb9b2a 100644 --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,166 +1,166 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Elastic Search backend """ from swh.core import utils from swh.core.config import SWHConfig from elasticsearch import Elasticsearch from elasticsearch.helpers import streaming_bulk class SWHElasticSearchClient(SWHConfig): - DEFAULT_BASE_FILENAME = 'backend/elastic' + CONFIG_BASE_FILENAME = 'backend/elastic' DEFAULT_CONFIG = { 'storage_nodes': ('[dict]', [{'host': 'localhost', 'port': 9200}]), 'index_name_prefix': ('str', 'swh-tasks'), 'client_options': ('dict', { 'sniff': True, 'http_compress': False, }) } def __init__(self, **config): if config: self.config = config else: self.config = self.parse_config_file() options = self.config['client_options'] sniff = options['sniff'] self.storage = Elasticsearch( # nodes to use by default self.config['storage_nodes'], # auto detect cluster's status sniff_on_start=sniff, sniff_on_connection_fail=sniff, sniffer_timeout=60, # compression or not http_compress=options['http_compress']) self.index_name_prefix = self.config['index_name_prefix'] # document's index type (cf. ../../data/elastic-template.json) self.doc_type = 'task' def compute_index_name(self, year, month): """Given a year, month, compute the index's name. """ return '%s-%s-%s' % ( self.index_name_prefix, year, '%02d' % month) def index(self, data): """Index given data to elasticsearch. The field 'ended' in data is used to compute the index to index data to. """ date = data['ended'] index_name = self.compute_index_name(date.year, date.month) return self.storage.index(index=index_name, doc_type=self.doc_type, body=data) def mget(self, index_name, doc_ids, chunk_size=500, source=True, log=None): """Retrieve document's full content according to their ids as per source's setup. The `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_ids (generator): Generator of ids to retrieve chunk_size (int): Number of documents chunk to send for retrieval source (bool/[str]): Source of information to return Yields: document indexed as per source's setup """ if isinstance(source, list): source = {'_source': ','.join(source)} else: source = {'_source': str(source).lower()} for ids in utils.grouper(doc_ids, n=1000): res = self.storage.mget(body={'ids': list(ids)}, index=index_name, doc_type=self.doc_type, params=source) if not res: if log: log.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] if log: log.warning(msg) continue yield doc['_source'] def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, log=None): """Bulk index data and returns the successful indexed data's identifier. Args: index_name (str): Name of the concerned index. doc_stream (generator): Generator of documents to index chunk_size (int): Number of documents chunk to send for indexation Yields: document id indexed """ actions = ({'_index': index_name, '_op_type': 'index', '_type': self.doc_type, '_source': data} for data in doc_stream) for ok, result in streaming_bulk(client=self.storage, actions=actions, chunk_size=chunk_size, raise_on_exception=False): if not ok: if log: log.error('Error during %s indexation. Skipping.' % result) continue yield result['index']['_id'] def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True, log=None): """Bulk index data and returns the successful indexed data as per source's setup. the `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_stream (generator): Document generator to index chunk_size (int): Number of documents chunk to send source (bool, [str]): the information to return """ indexed_ids = self._streaming_bulk( index_name, doc_stream, chunk_size=chunk_size, log=log) yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, source=source, log=log) diff --git a/version.txt b/version.txt index ad75280..0c3a2c3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.20-0-gd6b393d \ No newline at end of file +v0.0.21-0-gffd00cb \ No newline at end of file