Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/npm/tasks.py
# Copyright (C) 2018 the Software Heritage developers | # Copyright (C) 2018 the Software Heritage developers | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from swh.scheduler.celery_backend.config import app | from celery import shared_task | ||||
from swh.lister.npm.lister import NpmLister, NpmIncrementalLister | from swh.lister.npm.lister import NpmLister, NpmIncrementalLister | ||||
from swh.lister.npm.models import NpmVisitModel | from swh.lister.npm.models import NpmVisitModel | ||||
@contextmanager | @contextmanager | ||||
def save_registry_state(lister): | def save_registry_state(lister): | ||||
params = {'headers': lister.request_headers()} | params = {'headers': lister.request_headers()} | ||||
Show All 18 Lines | def get_last_update_seq(lister): | ||||
row = query.order_by(NpmVisitModel.uid.desc()).first() | row = query.order_by(NpmVisitModel.uid.desc()).first() | ||||
if not row: | if not row: | ||||
raise ValueError('No npm registry listing previously performed ! ' | raise ValueError('No npm registry listing previously performed ! ' | ||||
'This is required prior to the execution of an ' | 'This is required prior to the execution of an ' | ||||
'incremental listing.') | 'incremental listing.') | ||||
return row[0] | return row[0] | ||||
@app.task(name=__name__ + '.NpmListerTask') | @shared_task(name=__name__ + '.NpmListerTask') | ||||
def list_npm_full(**lister_args): | def list_npm_full(**lister_args): | ||||
'Full lister for the npm (javascript) registry' | 'Full lister for the npm (javascript) registry' | ||||
lister = NpmLister(**lister_args) | lister = NpmLister(**lister_args) | ||||
with save_registry_state(lister): | with save_registry_state(lister): | ||||
lister.run() | lister.run() | ||||
@app.task(name=__name__ + '.NpmIncrementalListerTask') | @shared_task(name=__name__ + '.NpmIncrementalListerTask') | ||||
def list_npm_incremental(**lister_args): | def list_npm_incremental(**lister_args): | ||||
'Incremental lister for the npm (javascript) registry' | 'Incremental lister for the npm (javascript) registry' | ||||
lister = NpmIncrementalLister(**lister_args) | lister = NpmIncrementalLister(**lister_args) | ||||
update_seq_start = get_last_update_seq(lister) | update_seq_start = get_last_update_seq(lister) | ||||
with save_registry_state(lister): | with save_registry_state(lister): | ||||
lister.run(min_bound=update_seq_start) | lister.run(min_bound=update_seq_start) | ||||
@app.task(name=__name__ + '.ping') | @shared_task(name=__name__ + '.ping') | ||||
def _ping(): | def _ping(): | ||||
return 'OK' | return 'OK' |