diff --git a/docs/spec-injection.md b/docs/spec-injection.md index 4c609cde..5715c9a0 100644 --- a/docs/spec-injection.md +++ b/docs/spec-injection.md @@ -1,219 +1,219 @@ # Injection specification (draft) This part discusses the deposit injection part on the server side. ## Tarball Injection The `swh-loader-tar` module is already able to inject tarballs in swh with very limited metadata (mainly the origin). The injection of the deposit will use the deposit's associated data: - the metadata - the archive(s) We will use the `synthetic` revision notion. To that revision will be associated the metadata. Those will be included in the hash computation, thus resulting in a unique identifier. ### Injection mapping Some of those metadata will also be included in the `origin_metadata` table. ``` origin | https://hal.inria.fr/hal-id | ------------------------------------|----------------------------------------| origin_visit | 1 :reception_date | origin_metadata | aggregated metadata | occurrence & occurrence_history | branch: client's version n° (e.g hal) | revision | synthetic_revision (tarball) | directory | upper level of the uncompressed archive| ``` ### Questions raised concerning injection - A deposit has one origin, yet an origin can have multiple deposits? No, an origin can have multiple requests for the same deposit. Which should end up in one single deposit (when the client pushes its final request saying deposit 'done' through the header In-Progress). Only update of existing 'partial' deposit is permitted. Other than that, the deposit 'update' operation. To create a new version of a software (already deposited), the client must prior to this create a new deposit. Illustration First deposit injection: HAL's deposit 01535619 = SWH's deposit **01535619-1** + 1 origin with url:https://hal.inria.fr/medihal-01535619 + 1 synthetic revision + 1 directory HAL's update on deposit 01535619 = SWH's deposit **01535619-2** (*with HAL updates can only be on the metadata and a new version is required if the content changes) + 1 origin with url:https://hal.inria.fr/medihal-01535619 + new synthetic revision (with new metadata) + same directory HAL's deposit 01535619-v2 = SWH's deposit **01535619-v2-1** + same origin + new revision + new directory ## Technical details ### Requirements - one dedicated database to store the deposit's state - swh-deposit - one dedicated temporary objstorage to store archives before injection - one client to test the communication with SWORD protocol ### Deposit reception schema - SWORD imposes the use of basic authentication, so we need a way to authenticate client. Also, a client can access collections: **deposit_client** table: - id (bigint): Client's identifier - username (str): Client's username - password (pass): Client's crypted password - collections ([id]): List of collections the client can access - Collections group deposits together: **deposit_collection** table: - id (bigint): Collection's identifier - name (str): Collection's human readable name - A deposit is the main object the repository is all about: **deposit** table: - id (bigint): deposit's identifier - reception_date (date): First deposit's reception date - complete_data (date): Date when the deposit is deemed complete and ready for injection - collection (id): The collection the deposit belongs to - external id (text): client's internal identifier (e.g hal's id, etc...). - client_id (id) : Client which did the deposit - swh_id (str) : swh identifier result once the injection is complete - status (enum): The deposit's current status - As mentioned, a deposit can have a status, whose possible values are: ``` text 'partial', -- the deposit is new or partially received since it -- can be done in multiple requests 'expired', -- deposit has been there too long and is now deemed -- ready to be garbage collected 'ready', -- deposit is fully received and ready for injection 'injecting, -- injection is ongoing on swh's side 'success', -- injection is successful 'failure' -- injection is a failure ``` A deposit is stateful and can be made in multiple requests: **deposit_request** table: - id (bigint): identifier - type (id): deposit request's type (possible values: 'archive', 'metadata') - deposit_id (id): deposit whose request belongs to - metadata: metadata associated to the request - date (date): date of the requests Information sent along a request are stored in a `deposit_request` row. They can be either of type `metadata` (atom entry, multipart's atom entry part) or of type `archive` (binary upload, multipart's binary upload part). When the deposit is complete (status `ready`), those `metadata` and `archive` deposit requests will be read and aggregated. They will then be sent as parameters to the injection routine. During injection, some of those metadata are kept in the `origin_metadata` table and some other are stored in the `revision` table (see [metadata injection](#metadata-injection)). The only update actions occurring on the deposit table are in regards of: - status changing: - `partial` -> {`expired`/`ready`}, - `ready` -> `injecting`, - `injecting` -> {`success`/`failure`} - `complete_date` when the deposit is finalized (when the status is changed to ready) - `swh-id` is populated once we have the injection result #### SWH Identifier returned - swh-- + The synthetic revision id - e.g: swh-hal-47dc6b4636c7f6cba0df83e3d5490bf4334d987e + e.g: 47dc6b4636c7f6cba0df83e3d5490bf4334d987e ### Scheduling injection All `archive` and `metadata` deposit requests should be aggregated before injection. The injection should be scheduled via the scheduler's api. Only `ready` deposit are concerned by the injection. When the injection is done and successful, the deposit entry is updated: - `status` is updated to `success` - `swh-id` is populated with the resulting hash (cf. [swh identifier](#swh-identifier-returned)) - `complete_date` is updated to the injection's finished time When the injection is failed, the deposit entry is updated: - `status` is updated to `failure` - `swh-id` and `complete_data` remains as is *Note:* As a further improvement, we may prefer having a retry policy with graceful delays for further scheduling. ### Metadata injection - the metadata received with the deposit should be kept in the `origin_metadata` table before translation as part of the injection process and an indexation process should be scheduled. - provider_id and tool_id are resolved by the prepare_metadata method in the loader-core - the origin_metadata entry is sent to storage by the send_origin_metadata in the loader-core origin_metadata table: ``` id bigint PK origin bigint discovery_date date provider_id bigint FK // (from provider table) tool_id bigint FK // indexer_configuration_id tool used for extraction metadata jsonb // before translation ``` diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index 4595b2e8..8532bffc 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,234 +1,233 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import tempfile from contextlib import contextmanager from django.http import FileResponse from rest_framework import status from swh.loader.tar import tarball from swh.model import hashutil, identifiers from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...models import Deposit, DepositRequest -from ...models import previous_revision_id @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ if len(archive_paths) > 1: # need to rebuild one archive # from multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.deposit-', dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = tarball.compress( aggregated_tarball_rootdir + '.zip', nature='zip', dirpath_or_files=aggregated_tarball_rootdir) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) else: # only 1 archive, no need to do fancy actions (and no cleanup step) yield archive_paths[0] class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ ADDITIONAL_CONFIG = { 'extraction_dir': ('str', '/tmp/swh-deposit/archive/'), } def __init__(self): super().__init__() self.extraction_dir = self.config['extraction_dir'] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def retrieve_archives(self, deposit_id): """Given a deposit identifier, returns its associated archives' path. Yields: path to deposited archives """ deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id') for deposit_request in deposit_requests: yield deposit_request.archive.path def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = list(self.retrieve_archives(deposit_id)) with aggregate_tarballs(self.extraction_dir, archive_paths) as path: return FileResponse(open(path, 'rb'), status=status.HTTP_200_OK, content_type='application/octet-stream') class SWHDepositReadMetadata(SWHGetDepositAPI, SWHPrivateAPIView): """Class in charge of aggregating metadata on a deposit. """ ADDITIONAL_CONFIG = { 'provider': ('dict', { # 'provider_name': '', # those are not set since read from the # 'provider_url': '', # deposit's client 'provider_type': 'deposit_client', 'metadata': {} }), 'tool': ('dict', { 'tool_name': 'swh-deposit', 'tool_version': '0.0.1', 'tool_configuration': { 'sword_version': '2' } }) } def __init__(self): super().__init__() self.provider = self.config['provider'] self.tool = self.config['tool'] def _aggregate_metadata(self, deposit, metadata_requests): """Retrieve and aggregates metadata information. """ metadata = {} for req in metadata_requests: metadata.update(req.metadata) return metadata def aggregate(self, deposit, requests): """Aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. requests ([DepositRequest]): List of associated requests which need aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ data = {} # Retrieve tarballs/metadata information metadata = self._aggregate_metadata(deposit, requests) # Read information metadata data['origin'] = { 'type': 'deposit', 'url': os.path.join(deposit.client.url.rstrip('/'), deposit.external_id), } # revision fullname = deposit.client.get_full_name() author_committer = { 'name': deposit.client.last_name, 'fullname': fullname, 'email': deposit.client.email, } # metadata provider self.provider['provider_name'] = deposit.client.last_name self.provider['provider_url'] = deposit.client.url revision_type = 'tar' revision_msg = '%s: Deposit %s in collection %s' % ( fullname, deposit.id, deposit.collection.name) complete_date = identifiers.normalize_timestamp(deposit.complete_date) data['revision'] = { 'synthetic': True, 'date': complete_date, 'committer_date': complete_date, 'author': author_committer, 'committer': author_committer, 'type': revision_type, 'message': revision_msg, 'metadata': metadata, } - parent_revision = previous_revision_id(deposit.swh_id) + parent_revision = deposit.swh_id if parent_revision: data['revision'] = { 'parents': [hashutil.hash_to_bytes(parent_revision)] } data['occurrence'] = { 'branch': 'master' } data['origin_metadata'] = { 'provider': self.provider, 'tool': self.tool, 'metadata': metadata } return data def process_get(self, req, collection_name, deposit_id): deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) data = self.aggregate(deposit, requests) d = {} if data: d = json.dumps(data) return status.HTTP_200_OK, d, 'application/json' diff --git a/swh/deposit/api/private/deposit_update_status.py b/swh/deposit/api/private/deposit_update_status.py index 9f432ed0..bb87d474 100644 --- a/swh/deposit/api/private/deposit_update_status.py +++ b/swh/deposit/api/private/deposit_update_status.py @@ -1,71 +1,71 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from rest_framework.parsers import JSONParser from ..common import SWHPutDepositAPI, SWHPrivateAPIView from ...errors import make_error_dict, BAD_REQUEST -from ...models import Deposit, DEPOSIT_STATUS_DETAIL, format_swh_id +from ...models import Deposit, DEPOSIT_STATUS_DETAIL class SWHUpdateStatusDeposit(SWHPutDepositAPI, SWHPrivateAPIView): """Deposit request class to update the deposit's status. HTTP verbs supported: PUT """ parser_classes = (JSONParser, ) def additional_checks(self, req, headers, collection_name, deposit_id=None): """Enrich existing checks to the default ones. New checks: - Ensure the status is provided - Ensure it exists """ data = req.data status = data.get('status') if not status: msg = 'The status key is mandatory with possible values %s' % list( DEPOSIT_STATUS_DETAIL.keys()) return make_error_dict(BAD_REQUEST, msg) if status not in DEPOSIT_STATUS_DETAIL: msg = 'Possible status in %s' % list(DEPOSIT_STATUS_DETAIL.keys()) return make_error_dict(BAD_REQUEST, msg) if status == 'success': swh_id = data.get('revision_id') if not swh_id: msg = 'Updating status to %s requires a revision_id key' % ( status, ) return make_error_dict(BAD_REQUEST, msg) return {} def restrict_access(self, req, deposit=None): """Remove restriction modification to 'partial' deposit. Update is possible regardless of the existing status. """ return None def process_put(self, req, headers, collection_name, deposit_id): """Update the deposit's status Returns: 204 No content """ deposit = Deposit.objects.get(pk=deposit_id) deposit.status = req.data['status'] # checks already done before swh_id = req.data.get('revision_id') if swh_id: - deposit.swh_id = format_swh_id(collection_name, swh_id) + deposit.swh_id = swh_id deposit.save() return {} diff --git a/swh/deposit/models.py b/swh/deposit/models.py index 003794ec..91afdb7e 100644 --- a/swh/deposit/models.py +++ b/swh/deposit/models.py @@ -1,232 +1,202 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Generated from: # cd swh_deposit && \ # python3 -m manage inspectdb from django.contrib.postgres.fields import JSONField, ArrayField from django.contrib.auth.models import User, UserManager from django.db import models from django.utils.timezone import now from .config import DEPOSIT_STATUS_READY, DEPOSIT_STATUS_READY_FOR_CHECKS from .config import DEPOSIT_STATUS_PARTIAL class Dbversion(models.Model): """Db version """ version = models.IntegerField(primary_key=True) release = models.DateTimeField(default=now, null=True) description = models.TextField(blank=True, null=True) class Meta: db_table = 'dbversion' def __str__(self): return str({ 'version': self.version, 'release': self.release, 'description': self.description }) """Possible status""" DEPOSIT_STATUS = [ (DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_PARTIAL), ('expired', 'expired'), (DEPOSIT_STATUS_READY_FOR_CHECKS, DEPOSIT_STATUS_READY_FOR_CHECKS), (DEPOSIT_STATUS_READY, DEPOSIT_STATUS_READY), ('rejected', 'rejected'), ('injecting', 'injecting'), ('success', 'success'), ('failure', 'failure'), ] """Possible status and the detailed meaning.""" DEPOSIT_STATUS_DETAIL = { DEPOSIT_STATUS_PARTIAL: 'Deposit is new or partially received since it can' ' be done in multiple requests', 'expired': 'Deposit has been there too long and is now ' 'deemed ready to be garbage collected', DEPOSIT_STATUS_READY_FOR_CHECKS: 'Deposit is ready for additional checks ' '(tarball ok, etc...)', DEPOSIT_STATUS_READY: 'Deposit is fully received, checked, and ' 'ready for injection', 'rejected': 'Deposit failed the checks', 'injecting': "Injection is ongoing on swh's side", 'success': 'Injection is successful', 'failure': 'Injection is a failure', } class DepositClient(User): """Deposit client """ collections = ArrayField(models.IntegerField(), null=True) objects = UserManager() url = models.TextField(null=False) class Meta: db_table = 'deposit_client' def __str__(self): return str({ 'id': self.id, 'collections': self.collections, 'username': super().username, }) -def format_swh_id(collection_name, revision_id): - """Format swh_id value before storing in swh-deposit backend. - - Args: - collection_name (str): the collection's name - revision_id (str): the revision's hash identifier - - Returns: - The identifier as string - - """ - return 'swh-%s-%s' % (collection_name, revision_id) - - -def previous_revision_id(swh_id): - """Compute the parent's revision id (if any) from the swh_id. - - Args: - swh_id (id): SWH Identifier from a previous deposit. - - Returns: - None if no parent revision is detected. - The revision id's hash if any. - - """ - if swh_id: - return swh_id.split('-')[2] - return None - - class Deposit(models.Model): """Deposit reception table """ id = models.BigAutoField(primary_key=True) # First deposit reception date reception_date = models.DateTimeField(auto_now_add=True) # Date when the deposit is deemed complete and ready for injection complete_date = models.DateTimeField(null=True) # collection concerned by the deposit collection = models.ForeignKey( 'DepositCollection', models.DO_NOTHING) # Deposit's external identifier external_id = models.TextField() # Deposit client client = models.ForeignKey('DepositClient', models.DO_NOTHING) # SWH's injection result identifier swh_id = models.TextField(blank=True, null=True) # Deposit's status regarding injection status = models.TextField( choices=DEPOSIT_STATUS, default=DEPOSIT_STATUS_PARTIAL) class Meta: db_table = 'deposit' def __str__(self): return str({ 'id': self.id, 'reception_date': self.reception_date, 'collection': self.collection.name, 'external_id': self.external_id, 'client': self.client.username, 'status': self.status }) class DepositRequestType(models.Model): """Deposit request type made by clients (either archive or metadata) """ id = models.BigAutoField(primary_key=True) name = models.TextField() class Meta: db_table = 'deposit_request_type' def __str__(self): return str({'id': self.id, 'name': self.name}) def client_directory_path(instance, filename): """Callable to upload archive in MEDIA_ROOT/user_/ Args: instance (DepositRequest): DepositRequest concerned by the upload filename (str): Filename of the uploaded file Returns: A path to be prefixed by the MEDIA_ROOT to access physically to the file uploaded. """ return 'client_{0}/{1}'.format(instance.deposit.client.id, filename) class DepositRequest(models.Model): """Deposit request associated to one deposit. """ id = models.BigAutoField(primary_key=True) # Deposit concerned by the request deposit = models.ForeignKey(Deposit, models.DO_NOTHING) date = models.DateTimeField(auto_now_add=True) # Deposit request information on the data to inject # this can be null when type is 'archive' metadata = JSONField(null=True) # this can be null when type is 'metadata' archive = models.FileField(null=True, upload_to=client_directory_path) type = models.ForeignKey( 'DepositRequestType', models.DO_NOTHING) class Meta: db_table = 'deposit_request' def __str__(self): meta = None if self.metadata: from json import dumps meta = dumps(self.metadata) archive_name = None if self.archive: archive_name = self.archive.name return str({ 'id': self.id, 'deposit': self.deposit, 'metadata': meta, 'archive': archive_name }) class DepositCollection(models.Model): id = models.BigAutoField(primary_key=True) # Human readable name for the collection type e.g HAL, arXiv, etc... name = models.TextField() class Meta: db_table = 'deposit_collection' def __str__(self): return str({'id': self.id, 'name': self.name}) diff --git a/swh/deposit/tests/api/test_deposit_update_status.py b/swh/deposit/tests/api/test_deposit_update_status.py index 2ef3f2c6..909ad0ad 100644 --- a/swh/deposit/tests/api/test_deposit_update_status.py +++ b/swh/deposit/tests/api/test_deposit_update_status.py @@ -1,120 +1,119 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from django.core.urlresolvers import reverse from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL from swh.deposit.config import PRIVATE_PUT_DEPOSIT, DEPOSIT_STATUS_READY from ..common import BasicTestCase class UpdateDepositStatusTest(APITestCase, BasicTestCase): """Update the deposit's status scenario """ def setUp(self): super().setUp() deposit = Deposit(status=DEPOSIT_STATUS_READY, collection=self.collection, client=self.user) deposit.save() self.deposit = Deposit.objects.get(pk=deposit.id) assert self.deposit.status == DEPOSIT_STATUS_READY @istest def update_deposit_status(self): """Existing status for update should return a 204 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) possible_status = set(DEPOSIT_STATUS_DETAIL.keys()) - set(['success']) for _status in possible_status: response = self.client.put( url, content_type='application/json', data=json.dumps({'status': _status})) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=self.deposit.id) self.assertEquals(deposit.status, _status) @istest def update_deposit_with_success_ingestion_and_swh_id(self): """Existing status for update should return a 204 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) expected_status = 'success' - revision_id = '47dc6b4636c7f6cba0df83e3d5490bf4334d987e' - expected_id = 'swh-hal-%s' % revision_id + expected_id = revision_id = '47dc6b4636c7f6cba0df83e3d5490bf4334d987e' response = self.client.put( url, content_type='application/json', data=json.dumps({ 'status': expected_status, 'revision_id': revision_id, })) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=self.deposit.id) self.assertEquals(deposit.status, expected_status) self.assertEquals(deposit.swh_id, expected_id) @istest def update_deposit_status_will_fail_with_unknown_status(self): """Unknown status for update should return a 400 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'status': 'unknown'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @istest def update_deposit_status_will_fail_with_no_status_key(self): """No status provided for update should return a 400 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'something': 'something'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @istest def update_deposit_status_success_without_swh_id_fail(self): """Providing 'success' status without swh_id should return a 400 """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'status': 'success'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)