diff --git a/PKG-INFO b/PKG-INFO index b26d95b..d52018f 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.12 +Version: 0.0.13 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index b26d95b..d52018f 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.12 +Version: 0.0.13 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/core/hashutil.py b/swh/core/hashutil.py index ff0679c..5c3059b 100644 --- a/swh/core/hashutil.py +++ b/swh/core/hashutil.py @@ -1,106 +1,106 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import functools import hashlib import os from io import BytesIO # supported hashing algorithms ALGORITHMS = set(['sha1', 'sha256', 'sha1_git']) # should be a multiple of 64 (sha1/sha256's block size) # FWIW coreutils' sha1sum uses 32768 HASH_BLOCK_SIZE = 32768 def _new_hash(algo, length=None): """Initialize a digest object (as returned by python's hashlib) for the requested algorithm. See the constant ALGORITHMS for the list of supported algorithms. If a git-specific hashing algorithm is requested (e.g., "sha1_git"), the hashing object will be pre-fed with the needed header; for this to work, length must be given. """ if algo not in ALGORITHMS: raise ValueError('unknown hashing algorithm ' + algo) h = None if algo.endswith('_git'): if length is None: raise ValueError('missing length for git hashing algorithm') h = hashlib.new(algo.split('_')[0]) h.update(('blob %d\0' % length).encode('ascii')) # git hash header else: h = hashlib.new(algo) return h def _hash_file_obj(f, length, algorithms=ALGORITHMS, chunk_cb=None): """hash the content of a file-like object If chunk_cb is given, call it on each data chunk after updating the hash """ hashers = {algo: _new_hash(algo, length) for algo in algorithms} while True: chunk = f.read(HASH_BLOCK_SIZE) if not chunk: break for h in hashers.values(): h.update(chunk) - if chunk_cb: - chunk_cb(chunk) + if chunk_cb: + chunk_cb(chunk) return {algo: hashers[algo].digest() for algo in hashers} def _hash_fname(fname, algorithms=ALGORITHMS): """hash the content of a file specified by file name """ length = os.path.getsize(fname) with open(fname, 'rb') as f: return _hash_file_obj(f, length) def hashfile(f, length=None, algorithms=ALGORITHMS): """Hash the content of a given file, given either as a file-like object or a file name. All specified hash algorithms will be computed, reading the file only once. Returns a dictionary mapping algorithm names to hex-encoded checksums. When passing a file-like object, content length must be given; when passing a file name, content length is ignored. """ if isinstance(f, str): return _hash_fname(f, algorithms) else: return _hash_file_obj(f, length, algorithms) def hashdata(data, algorithms=ALGORITHMS): """Like hashfile, but hashes content passed as a string (of bytes) """ buf = BytesIO(data) return _hash_file_obj(buf, len(data), algorithms) @functools.lru_cache() def hash_to_hex(hash): """Converts a hash to its hexadecimal string representation""" return binascii.hexlify(hash).decode('ascii') @functools.lru_cache() def hex_to_hash(hex): """Converts a hexadecimal string representation of a hash to that hash""" return bytes.fromhex(hex) diff --git a/swh/core/scheduling.py b/swh/core/scheduling.py index 16f307f..a7aedde 100644 --- a/swh/core/scheduling.py +++ b/swh/core/scheduling.py @@ -1,31 +1,34 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery from celery.utils.log import get_task_logger class Task(celery.Task): """a schedulable task (abstract class) - Sub-classes must implement the run() method + Sub-classes must implement the run() method. Sub-classes that + want their tasks to get routed to a non-default task queue must + override the task_queue attribute. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True + task_queue = 'celery' def run(self, *args, **kwargs): raise NotImplementedError('tasks must implement the run() method') @property def log(self): if not hasattr(self, '__log'): self.__log = get_task_logger('%s.%s' % (__name__, self.__class__.__name__)) return self.__log diff --git a/swh/core/worker.py b/swh/core/worker.py index 3f0dcdc..d5de51d 100644 --- a/swh/core/worker.py +++ b/swh/core/worker.py @@ -1,139 +1,138 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from celery import Celery from celery.signals import setup_logging from celery.utils.log import ColorFormatter from kombu import Exchange, Queue from swh.core.config import load_named_config from swh.core.logger import PostgresHandler CONFIG_NAME = 'worker.ini' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG formatter = logging.Formatter(format) if colorize: color_formatter = ColorFormatter(format) else: color_formatter = formatter root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) pg = PostgresHandler(CONFIG['log_db']) pg.setFormatter(logging.Formatter(format)) pg.setLevel(logging.DEBUG) pg.setFormatter(formatter) root_logger.addHandler(console) root_logger.addHandler(pg) celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) # Silence useless "Starting new HTTP connection" messages urllib3_logger = logging.getLogger('urllib3') urllib3_logger.setLevel(logging.WARNING) swh_logger = logging.getLogger('swh') swh_logger.setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task celery_task_logger = logging.getLogger('celery.task') celery_task_logger.setLevel(loglevel) +class TaskRouter: + """Route tasks according to the task_queue attribute in the task class""" + def route_for_task(self, task, args=None, kwargs=None): + task_class = app.tasks[task] + if hasattr(task_class, 'task_queue'): + return {'queue': task_class.task_queue} + return None + + # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = Celery() app.conf.update( # The broker BROKER_URL=CONFIG['task_broker'], # Timezone configuration: all in UTC CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='UTC', # Imported modules CELERY_IMPORTS=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. CELERY_TASK_RESULT_EXPIRES=None, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['msgpack', 'pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], # Task routing - CELERY_ROUTES={ - 'swh.loader.git.tasks.LoadGitRepository': { - 'queue': 'swh_loader_git', - }, - 'swh.loader.git.tasks.LoadGitHubRepository': { - 'queue': 'swh_loader_git', - }, - 'swh.cloner.git.worker.tasks.execute_with_measure': { - 'queue': 'swh_cloner_git', - }, - }, + CELERY_ROUTES=TaskRouter(), # Task queues this worker will consume from CELERY_QUEUES=CELERY_QUEUES, # Allow pool restarts from remote CELERYD_POOL_RESTARTS=True, # Do not prefetch tasks CELERYD_PREFETCH_MULTIPLIER=1, ) diff --git a/version.txt b/version.txt index a3e195a..a1c7b73 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.12-0-g444daaf \ No newline at end of file +v0.0.13-0-gb638943 \ No newline at end of file