diff --git a/debian/control b/debian/control index 6916730e..18ce6bff 100644 --- a/debian/control +++ b/debian/control @@ -1,26 +1,28 @@ Source: swh-deposit Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-setuptools, python3-all, python3-nose, python3-vcversioner, python3-swh.core (>= 0.0.14~), python3-swh.loader.tar (>= 0.0.26~), + python3-swh.scheduler (>= 0.0.17~), python3-django, python3-click, python3-vcversioner, python3-djangorestframework, python3-djangorestframework-xml Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/source/swh-deposit/ Package: python3-swh.deposit Architecture: all Depends: python3-swh.core (>= 0.0.14~), python3-swh.loader.tar (>= 0.0.26~), + python3-swh.scheduler (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Deposit Server diff --git a/requirements-swh.txt b/requirements-swh.txt index 6c95d9a8..76ba1a29 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core >= 0.0.14 swh.loader.tar >= 0.0.26 +swh.scheduler >= v0.0.17 diff --git a/swh/deposit/config.py b/swh/deposit/config.py index b2fc7a90..dc7a25b2 100644 --- a/swh/deposit/config.py +++ b/swh/deposit/config.py @@ -1,65 +1,66 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from swh.core.config import SWHConfig # IRIs (Internationalized Resource identifier) sword 2.0 specified EDIT_SE_IRI = 'edit_se_iri' EM_IRI = 'em_iri' CONT_FILE_IRI = 'cont_file_iri' SD_IRI = 'servicedocument' COL_IRI = 'upload' STATE_IRI = 'status' +DEPOSIT_RAW_CONTENT = 'download' ARCHIVE_KEY = 'archive' METADATA_KEY = 'metadata' AUTHORIZED_PLATFORMS = ['development', 'production'] def setup_django_for(platform): """Setup function for command line tools (swh.deposit.create_user, swh.deposit.scheduler.cli) to initialize the needed db access. Note: Do not import any django related module prior to this function call. Otherwise, this will raise an django.core.exceptions.ImproperlyConfigured error message. Args: platform (str): the platform the scheduling is running Raises: ValueError in case of wrong platform inputs. """ if platform not in AUTHORIZED_PLATFORMS: raise ValueError('Platform should be one of %s' % AUTHORIZED_PLATFORMS) os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'swh.deposit.settings.%s' % platform) import django django.setup() class SWHDefaultConfig(SWHConfig): """Mixin intended to enrich views with SWH configuration. """ CONFIG_BASE_FILENAME = 'deposit/server' DEFAULT_CONFIG = { 'max_upload_size': ('int', 209715200), } def __init__(self, **config): super().__init__() self.config = self.parse_config_file() self.config.update(config) self.log = logging.getLogger('swh.deposit') diff --git a/swh/deposit/scheduler/cli.py b/swh/deposit/scheduler/cli.py index 6483153f..b2c48a15 100644 --- a/swh/deposit/scheduler/cli.py +++ b/swh/deposit/scheduler/cli.py @@ -1,321 +1,248 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of scheduling deposit injection one-shot task to swh. """ import click -import os import logging from abc import ABCMeta, abstractmethod from swh.core.config import SWHConfig from swh.deposit.config import setup_django_for from swh.model import hashutil -from swh.objstorage import get_objstorage def previous_revision_id(swh_id): """Compute the parent's revision id (if any) from the swh_id. Args: swh_id (id): SWH Identifier from a previous deposit. Returns: None if no parent revision is detected. The revision id's hash if any. """ if swh_id: return swh_id.split('-')[2] return None class SWHScheduling(SWHConfig, metaclass=ABCMeta): """Base swh scheduling class to aggregate the schedule deposit injection. """ - CONFIG_BASE_FILENAME = 'deposit/server' - DEFAULT_CONFIG = { - 'objstorage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002', - } - }), - 'extraction_dir': ('str', '/srv/storage/space/tmp/') - } def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.log = logging.getLogger('swh.deposit.scheduling') - self.objstorage = get_objstorage(**self.config['objstorage']) - - def _aggregate_tarballs(self, deposit, archive_requests): - """Retrieve and aggregates tarballs information. - - """ - import shutil - import tempfile - from swh.loader.tar import tarball - - # root directory to manipulate tarballs - extraction_dir = self.config['extraction_dir'] - - os.makedirs(extraction_dir, 0o755, exist_ok=True) - dir_path = tempfile.mkdtemp(prefix='swh.deposit.scheduler-', - dir=extraction_dir) - - if len(archive_requests) > 1: # need to rebuild one archive - # from multiple ones - # root folder to build an aggregated tarball - aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') - os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) - - for archive_request in archive_requests: - archive = archive_request.metadata['archive'] - archive_name = archive['name'] - archive_id = archive['id'] - - # write in a temporary location the tarball - temp_tarball = os.path.join(dir_path, archive_name) - - # build the temporary tarball - with open(temp_tarball, 'wb') as f: - for chunk in self.objstorage.get_stream(archive_id): - f.write(chunk) - - # to uncompress it in another temporary tarball directory - tarball.uncompress(temp_tarball, aggregated_tarball_rootdir) - - # clean up the temporary compressed tarball - os.remove(temp_tarball) - - # Aggregate into one big tarball the multiple smaller ones - temp_tarball = tarball.compress( - aggregated_tarball_rootdir + '.zip', - nature='zip', - dirpath_or_files=aggregated_tarball_rootdir) - - # clean up the temporary uncompressed tarball - shutil.rmtree(aggregated_tarball_rootdir) - - else: # we only need to retrieve the archive from the - # objstorage - archive = archive_requests[0].metadata['archive'] - archive_name = archive['name'] - archive_id = archive['id'] - - # write in a temporary location the tarball - temp_tarball = os.path.join(dir_path, archive_name) - - # build the temporary tarball - with open(temp_tarball, 'wb') as f: - for chunk in self.objstorage.get_stream(archive_id): - f.write(chunk) - - # FIXME: 1. Need to clean up the temporary space - # FIXME: 2. In case of multiple archives, the archive name - # elected here is the last one - - print('temporary tarball', temp_tarball) - - data = { - 'tarpath': temp_tarball, - 'name': archive_name, - } - return data def _aggregate_metadata(self, deposit, metadata_requests): """Retrieve and aggregates metadata information. """ metadata = {} for req in metadata_requests: metadata.update(req.metadata) return metadata - def aggregate(self, deposit, requests): + def aggregate(self, deposit, deposit_archive_url, requests): """Aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. + deposit_archive_url (str): Url to retrieve a tarball from + the deposit instance requests ([DepositRequest]): List of associated requests which need aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ data = {} - metadata_requests = [] - archive_requests = [] - for req in requests: - if req.type.name == 'archive': - archive_requests.append(req) - elif req.type.name == 'metadata': - metadata_requests.append(req) - else: - raise ValueError('Unknown request type %s' % req.type) # Retrieve tarballs/metadata information - archive_data = self._aggregate_tarballs(deposit, archive_requests) metadata = self._aggregate_metadata(deposit, metadata_requests) - data['tarpath'] = archive_data['tarpath'] + data['deposit_archive_url'] = deposit_archive_url # Read information metadata data['origin'] = { 'type': deposit.collection.name, 'url': deposit.external_id, } # revision fullname = deposit.client.get_full_name() author_committer = { 'name': deposit.client.last_name, 'fullname': fullname, 'email': deposit.client.email, } revision_type = 'tar' revision_msg = '%s: Deposit %s in collection %s' % ( fullname, deposit.id, deposit.collection.name) complete_date = deposit.complete_date data['revision'] = { 'synthetic': True, 'date': complete_date, 'committer_date': complete_date, 'author': author_committer, 'committer': author_committer, 'type': revision_type, 'message': revision_msg, 'metadata': metadata, } parent_revision = previous_revision_id(deposit.swh_id) if parent_revision: data['revision'] = { 'parents': [hashutil.hash_to_bytes(parent_revision)] } - data['occurrence'] = { - 'branch': archive_data['name'], - } - return data @abstractmethod def schedule(self, deposit, data): """Schedule the new deposit injection. Args: deposit (Deposit): Deposit concerned by the data aggregation. data (dict): Deposit aggregated data Returns: None """ pass class SWHCeleryScheduling(SWHScheduling): """Deposit injection as Celery task scheduling. """ ADDITIONAL_CONFIG = { - 'task_name': ('str', 'swh.loader.tar.tasks.LoadTarRepository'), + 'task_name': ('str', 'swh.deposit.tasks.LoadDepositArchive'), 'dry_run': ('bool', False), } def __init__(self): super().__init__() from swh.scheduler import utils self.task_name = self.config['task_name'] self.task = utils.get_task(self.task_name) self.dry_run = self.config['dry_run'] def schedule(self, deposit_data): """Schedule the new deposit injection directly through celery. Args: deposit_data (dict): Deposit aggregated information. Returns: None """ - tarpath = deposit_data['tarpath'] + deposit_archive_url = deposit_data['deposit_archive_url'] origin = deposit_data['origin'] visit_date = None # default to Now revision = deposit_data['revision'] - occurrence = deposit_data['occurrence'] if not self.dry_run: return self.task.delay( - tarpath, origin, visit_date, revision, [occurrence]) + deposit_archive_url=deposit_archive_url, + origin=origin, + visit_date=visit_date, + revision=revision) - print(tarpath, origin, visit_date, revision, [occurrence]) + print(deposit_archive_url, origin, visit_date, revision) class SWHScheduling(SWHConfig): """Deposit injection as SWH's task scheduling interface. """ ADDITIONAL_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler-dev'), } def __init__(self): super().__init__() from swh.scheduler.backend import SchedulerBackend self.scheduler = SchedulerBackend(self.config) def schedule(self, deposit, data): """Schedule the new deposit injection through swh.scheduler's one-shot task api. Args: deposit (Deposit): Deposit concerned by the data aggregation. data (dict): Deposit aggregated data Returns: None """ pass @click.command( help='Schedule one-shot deposit injections') @click.option('--platform', default='development', help='development or production platform') -def main(platform): +@click.option('--scheduling-method', default='celery', + help='Scheduling method') +@click.option('--server', default='http://127.0.0.1:5006', + help='Deposit server') +def main(platform, scheduling_method, server): setup_django_for(platform) - from swh.deposit.models import Deposit, DepositRequest + from swh.deposit.models import Deposit, DepositRequest, DepositRequestType - scheduling = SWHCeleryScheduling() + if scheduling_method == 'celery': + scheduling = SWHCeleryScheduling() + elif scheduling_method == 'swh-scheduler': + scheduling = SWHScheduling() + else: + raise ValueError( + 'Only `celery` or `swh-scheduler` values are accepted') + + from swh.deposit.config import DEPOSIT_RAW_CONTENT + from django.core.urlresolvers import reverse + + _request_types = DepositRequestType.objects.all() + deposit_request_types = { + type.name: type for type in _request_types + } deposits = Deposit.objects.filter(status='ready') for deposit in deposits: - requests = DepositRequest.objects.filter(deposit_id=deposit.id) + deposit_archive_url = '%s%s' % (server, reverse( + DEPOSIT_RAW_CONTENT, + args=[deposit.collection.name, deposit.id])) + + requests = DepositRequest.objects.filter( + deposit=deposit, type=deposit_request_types['metadata']) + + deposit_data = scheduling.aggregate( + deposit, deposit_archive_url, requests) - deposit_data = scheduling.aggregate(deposit, requests) scheduling.schedule(deposit_data) if __name__ == '__main__': main() diff --git a/swh/deposit/tasks.py b/swh/deposit/tasks.py new file mode 100644 index 00000000..fbee23f3 --- /dev/null +++ b/swh/deposit/tasks.py @@ -0,0 +1,39 @@ +# Copyright (C) 2015-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.task import Task + +from swh.loader.tar.loader import TarLoader + + +def fetch_archive_locally(archive_url): + pass + + +class LoadDepositArchive(Task): + task_queue = 'swh_deposit_archive' + + def run_task(self, *, deposit_archive_url, origin, visit_date, + revision): + """Import a deposit tarball into swh. + + Args: see :func:`TarLoader.load`. + + """ + loader = TarLoader() + loader.log = self.log + + # FIXME: Retrieve tarball and copy locally + + tar_path = 'foobar' + + import os + occurrence = os.path.basename(tar_path) + + self.log.info('%s %s %s %s %s' % (deposit_archive_url, origin, + visit_date, revision, + [occurrence])) + # loader.load(tar_path=tar_path, origin=origin, visit_date=visit_date, + # revision=revision, occurrences=[occurrence]) diff --git a/swh/deposit/urls.py b/swh/deposit/urls.py index f2dc7fb5..0e16571c 100644 --- a/swh/deposit/urls.py +++ b/swh/deposit/urls.py @@ -1,77 +1,77 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """swh URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/1.10/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: url(r'^$', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.conf.urls import url, include 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls')) """ from django.conf.urls import url from rest_framework.urlpatterns import format_suffix_patterns from .config import EDIT_SE_IRI, EM_IRI, CONT_FILE_IRI -from .config import SD_IRI, COL_IRI, STATE_IRI +from .config import SD_IRI, COL_IRI, STATE_IRI, DEPOSIT_RAW_CONTENT from .api.common import index from .api.deposit import SWHDeposit from .api.deposit_status import SWHDepositStatus from .api.deposit_update import SWHUpdateMetadataDeposit from .api.deposit_update import SWHUpdateArchiveDeposit from .api.deposit_content import SWHDepositContent from .api.service_document import SWHServiceDocument from .api.deposit_read import SWHDepositReadArchives urlpatterns = [ url(r'^$', index, name='home'), # SD IRI - Service Document IRI # -> GET url(r'^1/servicedocument/', SWHServiceDocument.as_view(), name=SD_IRI), # Col IRI - Collection IRI # -> POST url(r'^1/(?P[^/]+)/$', SWHDeposit.as_view(), name=COL_IRI), # EM IRI - Atom Edit Media IRI (update archive IRI) # -> PUT (update-in-place existing archive) # -> POST (add new archive) url(r'^1/(?P[^/]+)/(?P[^/]+)/media/$', SWHUpdateArchiveDeposit.as_view(), name=EM_IRI), # Edit IRI - Atom Entry Edit IRI (update metadata IRI) # SE IRI - Sword Edit IRI ;; possibly same as Edit IRI # -> PUT (update in place) # -> POST (add new metadata) url(r'^1/(?P[^/]+)/(?P[^/]+)/metadata/$', SWHUpdateMetadataDeposit.as_view(), name=EDIT_SE_IRI), # State IRI # -> GET url(r'^1/(?P[^/]+)/(?P[^/]+)/status/$', SWHDepositStatus.as_view(), name=STATE_IRI), # Cont/File IRI # -> GET url(r'^1/(?P[^/]+)/(?P[^/]+)/content/$', SWHDepositContent.as_view(), name=CONT_FILE_IRI), # specification is not clear about # FILE-IRI, we assume it's the same as # the CONT-IRI one # Retrieve deposit's raw archives' content # -> GET url(r'^1/(?P[^/]+)/(?P[^/]+)/raw/$', SWHDepositReadArchives.as_view(), - name='download'), + name=DEPOSIT_RAW_CONTENT), ] urlpatterns = format_suffix_patterns(urlpatterns)