diff --git a/swh/deposit/api/common.py b/swh/deposit/api/common.py index a53b2d6e..dbff46e0 100644 --- a/swh/deposit/api/common.py +++ b/swh/deposit/api/common.py @@ -1,910 +1,901 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib from typing import Any, Tuple from abc import ABCMeta, abstractmethod from django.urls import reverse from django.http import HttpResponse from django.shortcuts import render from django.utils import timezone from rest_framework import status from rest_framework.authentication import BasicAuthentication from rest_framework.permissions import IsAuthenticated from rest_framework.views import APIView from swh.model import hashutil from swh.scheduler.utils import create_oneshot_task_dict -from swh.deposit.utils import origin_url_from from ..config import ( SWHDefaultConfig, EDIT_SE_IRI, EM_IRI, CONT_FILE_IRI, ARCHIVE_KEY, METADATA_KEY, RAW_METADATA_KEY, STATE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, - DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, + PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_LOAD_SUCCESS, ARCHIVE_TYPE, METADATA_TYPE ) from ..errors import ( MAX_UPLOAD_SIZE_EXCEEDED, BAD_REQUEST, ERROR_CONTENT, CHECKSUM_MISMATCH, make_error_dict, MEDIATION_NOT_ALLOWED, make_error_response_from_dict, FORBIDDEN, NOT_FOUND, make_error_response, METHOD_NOT_ALLOWED, ParserError, PARSING_ERROR ) from ..models import ( Deposit, DepositRequest, DepositCollection, DepositClient ) from ..parsers import parse_xml ACCEPT_PACKAGINGS = ['http://purl.org/net/sword/package/SimpleZip'] ACCEPT_ARCHIVE_CONTENT_TYPES = ['application/zip', 'application/x-tar'] class SWHAPIView(APIView): """Mixin intended as a based API view to enforce the basic authentication check """ authentication_classes = (BasicAuthentication, ) # type: Tuple[Any, ...] permission_classes = (IsAuthenticated, ) class SWHBaseDeposit(SWHDefaultConfig, SWHAPIView, metaclass=ABCMeta): """Base deposit request class sharing multiple common behaviors. """ def _read_headers(self, req): """Read and unify the necessary headers from the request (those are not stored in the same location or not properly formatted). Args: req (Request): Input request Returns: Dictionary with the following keys (some associated values may be None): - content-type - content-length - in-progress - content-disposition - packaging - slug - on-behalf-of """ meta = req._request.META content_type = req.content_type content_length = meta.get('CONTENT_LENGTH') if content_length and isinstance(content_length, str): content_length = int(content_length) # final deposit if not provided in_progress = meta.get('HTTP_IN_PROGRESS', False) content_disposition = meta.get('HTTP_CONTENT_DISPOSITION') if isinstance(in_progress, str): in_progress = in_progress.lower() == 'true' content_md5sum = meta.get('HTTP_CONTENT_MD5') if content_md5sum: content_md5sum = bytes.fromhex(content_md5sum) packaging = meta.get('HTTP_PACKAGING') slug = meta.get('HTTP_SLUG') on_behalf_of = meta.get('HTTP_ON_BEHALF_OF') metadata_relevant = meta.get('HTTP_METADATA_RELEVANT') return { 'content-type': content_type, 'content-length': content_length, 'in-progress': in_progress, 'content-disposition': content_disposition, 'content-md5sum': content_md5sum, 'packaging': packaging, 'slug': slug, 'on-behalf-of': on_behalf_of, 'metadata-relevant': metadata_relevant, } def _compute_md5(self, filehandler): """Compute uploaded file's md5 sum. Args: filehandler (InMemoryUploadedFile): the file to compute the md5 hash Returns: the md5 checksum (str) """ h = hashlib.md5() for chunk in filehandler: h.update(chunk) return h.digest() def _deposit_put(self, req, deposit_id=None, in_progress=False, external_id=None): """Save/Update a deposit in db. Args: deposit_id (int): deposit identifier in_progress (dict): The deposit's status external_id (str): The external identifier to associate to the deposit Returns: The Deposit instance saved or updated. """ if in_progress is False: complete_date = timezone.now() status_type = DEPOSIT_STATUS_DEPOSITED else: complete_date = None status_type = DEPOSIT_STATUS_PARTIAL if not deposit_id: try: # find a deposit parent (same external id, status load # to success) deposit_parent = Deposit.objects.filter( external_id=external_id, status=DEPOSIT_STATUS_LOAD_SUCCESS).order_by('-id')[0:1].get() # noqa except Deposit.DoesNotExist: deposit_parent = None deposit = Deposit(collection=self._collection, external_id=external_id, complete_date=complete_date, status=status_type, client=self._client, parent=deposit_parent) else: deposit = Deposit.objects.get(pk=deposit_id) # update metadata deposit.complete_date = complete_date deposit.status = status_type if self.config['checks']: deposit.save() # needed to have a deposit id args = [deposit.collection.name, deposit.id] scheduler = self.scheduler if (deposit.status == DEPOSIT_STATUS_DEPOSITED and not deposit.check_task_id): check_url = req.build_absolute_uri( reverse(PRIVATE_CHECK_DEPOSIT, args=args)) task = create_oneshot_task_dict( 'check-deposit', deposit_check_url=check_url) check_task_id = scheduler.create_tasks([task])[0]['id'] deposit.check_task_id = check_task_id - elif (deposit.status == DEPOSIT_STATUS_VERIFIED and - not deposit.load_task_id): - - url = origin_url_from(deposit) - task = create_oneshot_task_dict( - 'load-deposit', url=url, deposit_id=deposit.id) - load_task_id = scheduler.create_task([task])[0]['id'] - deposit.load_task_id = load_task_id deposit.save() return deposit def _deposit_request_put(self, deposit, deposit_request_data, replace_metadata=False, replace_archives=False): """Save a deposit request with metadata attached to a deposit. Args: deposit (Deposit): The deposit concerned by the request deposit_request_data (dict): The dictionary with at most 2 deposit request types (archive, metadata) to associate to the deposit replace_metadata (bool): Flag defining if we add or update existing metadata to the deposit replace_archives (bool): Flag defining if we add or update archives to existing deposit Returns: None """ if replace_metadata: DepositRequest.objects.filter( deposit=deposit, type=METADATA_TYPE).delete() if replace_archives: DepositRequest.objects.filter( deposit=deposit, type=ARCHIVE_TYPE).delete() deposit_request = None archive_file = deposit_request_data.get(ARCHIVE_KEY) if archive_file: deposit_request = DepositRequest( type=ARCHIVE_TYPE, deposit=deposit, archive=archive_file) deposit_request.save() metadata = deposit_request_data.get(METADATA_KEY) if metadata: raw_metadata = deposit_request_data.get(RAW_METADATA_KEY) deposit_request = DepositRequest( type=METADATA_TYPE, deposit=deposit, metadata=metadata, raw_metadata=raw_metadata) deposit_request.save() assert deposit_request is not None def _delete_archives(self, collection_name, deposit_id): """Delete archives reference from the deposit id. """ try: deposit = Deposit.objects.get(pk=deposit_id) except Deposit.DoesNotExist: return make_error_dict( NOT_FOUND, 'The deposit %s does not exist' % deposit_id) DepositRequest.objects.filter( deposit=deposit, type=ARCHIVE_TYPE).delete() return {} def _delete_deposit(self, collection_name, deposit_id): """Delete deposit reference. Args: collection_name (str): Client's name deposit_id (id): The deposit to delete Returns Empty dict when ok. Dict with error key to describe the failure. """ try: deposit = Deposit.objects.get(pk=deposit_id) except Deposit.DoesNotExist: return make_error_dict( NOT_FOUND, 'The deposit %s does not exist' % deposit_id) if deposit.collection.name != collection_name: summary = 'Cannot delete a deposit from another collection' description = "Deposit %s does not belong to the collection %s" % ( deposit_id, collection_name) return make_error_dict( BAD_REQUEST, summary=summary, verbose_description=description) DepositRequest.objects.filter(deposit=deposit).delete() deposit.delete() return {} def _check_preconditions_on(self, filehandler, md5sum, content_length=None): """Check preconditions on provided file are respected. That is the length and/or the md5sum hash match the file's content. Args: filehandler (InMemoryUploadedFile): The file to check md5sum (hex str): md5 hash expected from the file's content content_length (int): the expected length if provided. Returns: Either none if no error or a dictionary with a key error detailing the problem. """ if content_length: if content_length > self.config['max_upload_size']: return make_error_dict( MAX_UPLOAD_SIZE_EXCEEDED, 'Upload size limit exceeded (max %s bytes).' % self.config['max_upload_size'], 'Please consider sending the archive in ' 'multiple steps.') length = filehandler.size if length != content_length: return make_error_dict(status.HTTP_412_PRECONDITION_FAILED, 'Wrong length') if md5sum: _md5sum = self._compute_md5(filehandler) if _md5sum != md5sum: return make_error_dict( CHECKSUM_MISMATCH, 'Wrong md5 hash', 'The checksum sent %s and the actual checksum ' '%s does not match.' % (hashutil.hash_to_hex(md5sum), hashutil.hash_to_hex(_md5sum))) return None def _binary_upload(self, req, headers, collection_name, deposit_id=None, replace_metadata=False, replace_archives=False): """Binary upload routine. Other than such a request, a 415 response is returned. Args: req (Request): the request holding information to parse and inject in db headers (dict): request headers formatted collection_name (str): the associated client deposit_id (id): deposit identifier if provided replace_metadata (bool): 'Update or add' request to existing deposit. If False (default), this adds new metadata request to existing ones. Otherwise, this will replace existing metadata. replace_archives (bool): 'Update or add' request to existing deposit. If False (default), this adds new archive request to existing ones. Otherwise, this will replace existing archives. ones. Returns: In the optimal case a dict with the following keys: - deposit_id (int): Deposit identifier - deposit_date (date): Deposit date - archive: None (no archive is provided here) Otherwise, a dictionary with the key error and the associated failures, either: - 400 (bad request) if the request is not providing an external identifier - 413 (request entity too large) if the length of the archive exceeds the max size configured - 412 (precondition failed) if the length or md5 hash provided mismatch the reality of the archive - 415 (unsupported media type) if a wrong media type is provided """ content_length = headers['content-length'] if not content_length: return make_error_dict( BAD_REQUEST, 'CONTENT_LENGTH header is mandatory', 'For archive deposit, the ' 'CONTENT_LENGTH header must be sent.') content_disposition = headers['content-disposition'] if not content_disposition: return make_error_dict( BAD_REQUEST, 'CONTENT_DISPOSITION header is mandatory', 'For archive deposit, the ' 'CONTENT_DISPOSITION header must be sent.') packaging = headers['packaging'] if packaging and packaging not in ACCEPT_PACKAGINGS: return make_error_dict( BAD_REQUEST, 'Only packaging %s is supported' % ACCEPT_PACKAGINGS, 'The packaging provided %s is not supported' % packaging) filehandler = req.FILES['file'] precondition_status_response = self._check_preconditions_on( filehandler, headers['content-md5sum'], content_length) if precondition_status_response: return precondition_status_response external_id = headers['slug'] # actual storage of data archive_metadata = filehandler deposit = self._deposit_put(req, deposit_id=deposit_id, in_progress=headers['in-progress'], external_id=external_id) self._deposit_request_put( deposit, {ARCHIVE_KEY: archive_metadata}, replace_metadata=replace_metadata, replace_archives=replace_archives) return { 'deposit_id': deposit.id, 'deposit_date': deposit.reception_date, 'status': deposit.status, 'archive': filehandler.name, } def _read_metadata(self, metadata_stream): """Given a metadata stream, reads the metadata and returns both the parsed and the raw metadata. """ raw_metadata = metadata_stream.read() metadata = parse_xml(raw_metadata) return raw_metadata, metadata def _multipart_upload(self, req, headers, collection_name, deposit_id=None, replace_metadata=False, replace_archives=False): """Multipart upload supported with exactly: - 1 archive (zip) - 1 atom entry Other than such a request, a 415 response is returned. Args: req (Request): the request holding information to parse and inject in db headers (dict): request headers formatted collection_name (str): the associated client deposit_id (id): deposit identifier if provided replace_metadata (bool): 'Update or add' request to existing deposit. If False (default), this adds new metadata request to existing ones. Otherwise, this will replace existing metadata. replace_archives (bool): 'Update or add' request to existing deposit. If False (default), this adds new archive request to existing ones. Otherwise, this will replace existing archives. ones. Returns: In the optimal case a dict with the following keys: - deposit_id (int): Deposit identifier - deposit_date (date): Deposit date - archive: None (no archive is provided here) Otherwise, a dictionary with the key error and the associated failures, either: - 400 (bad request) if the request is not providing an external identifier - 412 (precondition failed) if the potentially md5 hash provided mismatch the reality of the archive - 413 (request entity too large) if the length of the archive exceeds the max size configured - 415 (unsupported media type) if a wrong media type is provided """ external_id = headers['slug'] content_types_present = set() data = { 'application/zip': None, # expected either zip 'application/x-tar': None, # or x-tar 'application/atom+xml': None, } for key, value in req.FILES.items(): fh = value if fh.content_type in content_types_present: return make_error_dict( ERROR_CONTENT, 'Only 1 application/zip (or application/x-tar) archive ' 'and 1 atom+xml entry is supported (as per sword2.0 ' 'specification)', 'You provided more than 1 application/(zip|x-tar) ' 'or more than 1 application/atom+xml content-disposition ' 'header in the multipart deposit') content_types_present.add(fh.content_type) data[fh.content_type] = fh if len(content_types_present) != 2: return make_error_dict( ERROR_CONTENT, 'You must provide both 1 application/zip (or ' 'application/x-tar) and 1 atom+xml entry for multipart ' 'deposit', 'You need to provide only 1 application/(zip|x-tar) ' 'and 1 application/atom+xml content-disposition header ' 'in the multipart deposit') filehandler = data['application/zip'] if not filehandler: filehandler = data['application/x-tar'] precondition_status_response = self._check_preconditions_on( filehandler, headers['content-md5sum']) if precondition_status_response: return precondition_status_response try: raw_metadata, metadata = self._read_metadata( data['application/atom+xml']) except ParserError: return make_error_dict( PARSING_ERROR, 'Malformed xml metadata', "The xml received is malformed. " "Please ensure your metadata file is correctly formatted.") # actual storage of data deposit = self._deposit_put(req, deposit_id=deposit_id, in_progress=headers['in-progress'], external_id=external_id) deposit_request_data = { ARCHIVE_KEY: filehandler, METADATA_KEY: metadata, RAW_METADATA_KEY: raw_metadata, } self._deposit_request_put( deposit, deposit_request_data, replace_metadata, replace_archives) return { 'deposit_id': deposit.id, 'deposit_date': deposit.reception_date, 'archive': filehandler.name, 'status': deposit.status, } def _atom_entry(self, req, headers, collection_name, deposit_id=None, replace_metadata=False, replace_archives=False): """Atom entry deposit. Args: req (Request): the request holding information to parse and inject in db headers (dict): request headers formatted collection_name (str): the associated client deposit_id (id): deposit identifier if provided replace_metadata (bool): 'Update or add' request to existing deposit. If False (default), this adds new metadata request to existing ones. Otherwise, this will replace existing metadata. replace_archives (bool): 'Update or add' request to existing deposit. If False (default), this adds new archive request to existing ones. Otherwise, this will replace existing archives. ones. Returns: In the optimal case a dict with the following keys: - deposit_id: deposit id associated to the deposit - deposit_date: date of the deposit - archive: None (no archive is provided here) Otherwise, a dictionary with the key error and the associated failures, either: - 400 (bad request) if the request is not providing an external identifier - 400 (bad request) if the request's body is empty - 415 (unsupported media type) if a wrong media type is provided """ try: raw_metadata, metadata = self._read_metadata(req.data) except ParserError: return make_error_dict( BAD_REQUEST, 'Malformed xml metadata', "The xml received is malformed. " "Please ensure your metadata file is correctly formatted.") if not metadata: return make_error_dict( BAD_REQUEST, 'Empty body request is not supported', 'Atom entry deposit is supposed to send for metadata. ' 'If the body is empty, there is no metadata.') external_id = metadata.get('external_identifier', headers['slug']) deposit = self._deposit_put(req, deposit_id=deposit_id, in_progress=headers['in-progress'], external_id=external_id) self._deposit_request_put( deposit, {METADATA_KEY: metadata, RAW_METADATA_KEY: raw_metadata}, replace_metadata, replace_archives) return { 'deposit_id': deposit.id, 'deposit_date': deposit.reception_date, 'archive': None, 'status': deposit.status, } def _empty_post(self, req, headers, collection_name, deposit_id): """Empty post to finalize an empty deposit. Args: req (Request): the request holding information to parse and inject in db headers (dict): request headers formatted collection_name (str): the associated client deposit_id (id): deposit identifier Returns: Dictionary of result with the deposit's id, the date it was completed and no archive. """ deposit = Deposit.objects.get(pk=deposit_id) deposit.complete_date = timezone.now() deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() return { 'deposit_id': deposit_id, 'deposit_date': deposit.complete_date, 'status': deposit.status, 'archive': None, } def _make_iris(self, req, collection_name, deposit_id): """Define the IRI endpoints Args: req (Request): The initial request collection_name (str): client/collection's name deposit_id (id): Deposit identifier Returns: Dictionary of keys with the iris' urls. """ args = [collection_name, deposit_id] return { iri: req.build_absolute_uri(reverse(iri, args=args)) for iri in [EM_IRI, EDIT_SE_IRI, CONT_FILE_IRI, STATE_IRI] } def additional_checks(self, req, headers, collection_name, deposit_id=None): """Permit the child class to enrich additional checks. Returns: dict with 'error' detailing the problem. """ return {} def checks(self, req, collection_name, deposit_id=None): try: self._collection = DepositCollection.objects.get( name=collection_name) except DepositCollection.DoesNotExist: return make_error_dict( NOT_FOUND, 'Unknown collection name %s' % collection_name) username = req.user.username if username: # unauthenticated request can have the username empty try: self._client = DepositClient.objects.get(username=username) except DepositClient.DoesNotExist: return make_error_dict(NOT_FOUND, 'Unknown client name %s' % username) if self._collection.id not in self._client.collections: return make_error_dict( FORBIDDEN, 'Client %s cannot access collection %s' % ( username, collection_name)) if deposit_id: try: deposit = Deposit.objects.get(pk=deposit_id) except Deposit.DoesNotExist: return make_error_dict( NOT_FOUND, 'Deposit with id %s does not exist' % deposit_id) checks = self.restrict_access(req, deposit) if checks: return checks headers = self._read_headers(req) if headers['on-behalf-of']: return make_error_dict(MEDIATION_NOT_ALLOWED, 'Mediation is not supported.') checks = self.additional_checks(req, headers, collection_name, deposit_id) if 'error' in checks: return checks return {'headers': headers} def restrict_access(self, req, deposit=None): if deposit: if (req.method != 'GET' and deposit.status != DEPOSIT_STATUS_PARTIAL): summary = "You can only act on deposit with status '%s'" % ( DEPOSIT_STATUS_PARTIAL, ) description = "This deposit has status '%s'" % deposit.status return make_error_dict( BAD_REQUEST, summary=summary, verbose_description=description) def _basic_not_allowed_method(self, req, method): return make_error_response( req, METHOD_NOT_ALLOWED, '%s method is not supported on this endpoint' % method) def get(self, req, *args, **kwargs): return self._basic_not_allowed_method(req, 'GET') def post(self, req, *args, **kwargs): return self._basic_not_allowed_method(req, 'POST') def put(self, req, *args, **kwargs): return self._basic_not_allowed_method(req, 'PUT') def delete(self, req, *args, **kwargs): return self._basic_not_allowed_method(req, 'DELETE') class SWHGetDepositAPI(SWHBaseDeposit, metaclass=ABCMeta): """Mixin for class to support GET method. """ def get(self, req, collection_name, deposit_id, format=None): """Endpoint to create/add resources to deposit. Returns: 200 response when no error during routine occurred 400 if the deposit does not belong to the collection 404 if the deposit or the collection does not exist """ checks = self.checks(req, collection_name, deposit_id) if 'error' in checks: return make_error_response_from_dict(req, checks['error']) r = self.process_get( req, collection_name, deposit_id) if isinstance(r, tuple): status, content, content_type = r return HttpResponse(content, status=status, content_type=content_type) return r @abstractmethod def process_get(self, req, collection_name, deposit_id): """Routine to deal with the deposit's get processing. Returns: Tuple status, stream of content, content-type """ pass class SWHPostDepositAPI(SWHBaseDeposit, metaclass=ABCMeta): """Mixin for class to support DELETE method. """ def post(self, req, collection_name, deposit_id=None, format=None): """Endpoint to create/add resources to deposit. Returns: 204 response when no error during routine occurred. 400 if the deposit does not belong to the collection 404 if the deposit or the collection does not exist """ checks = self.checks(req, collection_name, deposit_id) if 'error' in checks: return make_error_response_from_dict(req, checks['error']) headers = checks['headers'] _status, _iri_key, data = self.process_post( req, headers, collection_name, deposit_id) error = data.get('error') if error: return make_error_response_from_dict(req, error) data['packagings'] = ACCEPT_PACKAGINGS iris = self._make_iris(req, collection_name, data['deposit_id']) data.update(iris) response = render(req, 'deposit/deposit_receipt.xml', context=data, content_type='application/xml', status=_status) response._headers['location'] = 'Location', data[_iri_key] return response @abstractmethod def process_post(self, req, headers, collection_name, deposit_id=None): """Routine to deal with the deposit's processing. Returns Tuple of: - response status code (200, 201, etc...) - key iri (EM_IRI, EDIT_SE_IRI, etc...) - dictionary of the processing result """ pass class SWHPutDepositAPI(SWHBaseDeposit, metaclass=ABCMeta): """Mixin for class to support PUT method. """ def put(self, req, collection_name, deposit_id, format=None): """Endpoint to update deposit resources. Returns: 204 response when no error during routine occurred. 400 if the deposit does not belong to the collection 404 if the deposit or the collection does not exist """ checks = self.checks(req, collection_name, deposit_id) if 'error' in checks: return make_error_response_from_dict(req, checks['error']) headers = checks['headers'] data = self.process_put(req, headers, collection_name, deposit_id) error = data.get('error') if error: return make_error_response_from_dict(req, error) return HttpResponse(status=status.HTTP_204_NO_CONTENT) @abstractmethod def process_put(self, req, headers, collection_name, deposit_id): """Routine to deal with updating a deposit in some way. Returns dictionary of the processing result """ pass class SWHDeleteDepositAPI(SWHBaseDeposit, metaclass=ABCMeta): """Mixin for class to support DELETE method. """ def delete(self, req, collection_name, deposit_id): """Endpoint to delete some deposit's resources (archives, deposit). Returns: 204 response when no error during routine occurred. 400 if the deposit does not belong to the collection 404 if the deposit or the collection does not exist """ checks = self.checks(req, collection_name, deposit_id) if 'error' in checks: return make_error_response_from_dict(req, checks['error']) data = self.process_delete(req, collection_name, deposit_id) error = data.get('error') if error: return make_error_response_from_dict(req, error) return HttpResponse(status=status.HTTP_204_NO_CONTENT) @abstractmethod def process_delete(self, req, collection_name, deposit_id): """Routine to delete a resource. This is mostly not allowed except for the EM_IRI (cf. .api.deposit_update.SWHUpdateArchiveDeposit) """ pass diff --git a/swh/deposit/api/private/__init__.py b/swh/deposit/api/private/__init__.py index 4ed1154d..f0f86945 100644 --- a/swh/deposit/api/private/__init__.py +++ b/swh/deposit/api/private/__init__.py @@ -1,94 +1,94 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.deposit import utils -from ...config import METADATA_TYPE +from ...config import METADATA_TYPE, SWHDefaultConfig from ...models import DepositRequest, Deposit from rest_framework.permissions import AllowAny from swh.deposit.api.common import SWHAPIView from swh.deposit.errors import make_error_dict, NOT_FOUND class DepositReadMixin: """Deposit Read mixin """ def _deposit_requests(self, deposit, request_type): """Given a deposit, yields its associated deposit_request Args: deposit (Deposit): Deposit to list requests for request_type (str): 'archive' or 'metadata' Yields: deposit requests of type request_type associated to the deposit """ if isinstance(deposit, int): deposit = Deposit.objects.get(pk=deposit) deposit_requests = DepositRequest.objects.filter( type=request_type, deposit=deposit).order_by('id') for deposit_request in deposit_requests: yield deposit_request def _metadata_get(self, deposit): """Given a deposit, aggregate all metadata requests. Args: deposit (Deposit): The deposit instance to extract metadata from. Returns: metadata dict from the deposit. """ metadata = (m.metadata for m in self._deposit_requests( deposit, request_type=METADATA_TYPE)) return utils.merge(*metadata) -class SWHPrivateAPIView(SWHAPIView): +class SWHPrivateAPIView(SWHDefaultConfig, SWHAPIView): """Mixin intended as private api (so no authentication) based API view (for the private ones). """ authentication_classes = () permission_classes = (AllowAny, ) def checks(self, req, collection_name, deposit_id=None): """Override default checks implementation to allow empty collection. """ if deposit_id: try: Deposit.objects.get(pk=deposit_id) except Deposit.DoesNotExist: return make_error_dict( NOT_FOUND, 'Deposit with id %s does not exist' % deposit_id) headers = self._read_headers(req) checks = self.additional_checks( req, headers, collection_name, deposit_id) if 'error' in checks: return checks return {'headers': headers} def get(self, req, collection_name=None, deposit_id=None, format=None, *args, **kwargs): return super().get(req, collection_name, deposit_id, format) def put(self, req, collection_name=None, deposit_id=None, format=None, *args, **kwargs): return super().put(req, collection_name, deposit_id, format) diff --git a/swh/deposit/api/private/deposit_check.py b/swh/deposit/api/private/deposit_check.py index 8961d914..7f0387fa 100644 --- a/swh/deposit/api/private/deposit_check.py +++ b/swh/deposit/api/private/deposit_check.py @@ -1,209 +1,228 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import re import tarfile import zipfile +from itertools import chain +from shutil import get_unpack_formats + from rest_framework import status +from swh.scheduler.utils import create_oneshot_task_dict from . import DepositReadMixin, SWHPrivateAPIView from ..common import SWHGetDepositAPI from ...config import DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED from ...config import ARCHIVE_TYPE from ...models import Deposit MANDATORY_FIELDS_MISSING = 'Mandatory fields are missing' ALTERNATE_FIELDS_MISSING = 'Mandatory alternate fields are missing' MANDATORY_ARCHIVE_UNREADABLE = 'At least one of its associated archives is not readable' # noqa MANDATORY_ARCHIVE_INVALID = 'Mandatory archive is invalid (i.e contains only one archive)' # noqa MANDATORY_ARCHIVE_UNSUPPORTED = 'Mandatory archive type is not supported' MANDATORY_ARCHIVE_MISSING = 'Deposit without archive is rejected' ARCHIVE_EXTENSIONS = [ 'zip', 'tar', 'tar.gz', 'xz', 'tar.xz', 'bz2', 'tar.bz2', 'Z', 'tar.Z', 'tgz', '7z' ] PATTERN_ARCHIVE_EXTENSION = re.compile( r'.*\.(%s)$' % '|'.join(ARCHIVE_EXTENSIONS)) +def known_archive_format(filename): + return any(filename.endswith(t) for t in + chain(*(x[1] for x in get_unpack_formats()))) + + class SWHChecksDeposit(SWHPrivateAPIView, SWHGetDepositAPI, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ def _check_deposit_archives(self, deposit): """Given a deposit, check each deposit request of type archive. Args: The deposit to check archives for Returns tuple (status, error_detail): True, None if all archives are ok, (False, ) otherwise. """ requests = list(self._deposit_requests( deposit, request_type=ARCHIVE_TYPE)) if len(requests) == 0: # no associated archive is refused return False, { 'archive': [{ 'summary': MANDATORY_ARCHIVE_MISSING, }] } errors = [] for archive_request in requests: check, error_message = self._check_archive(archive_request) if not check: errors.append({ 'summary': error_message, 'fields': [archive_request.id] }) if not errors: return True, None return False, { 'archive': errors } def _check_archive(self, archive_request): """Check that a deposit associated archive is ok: - readable - supported archive format - valid content: the archive does not contain a single archive file If any of those checks are not ok, return the corresponding failing check. Args: archive_path (DepositRequest): Archive to check Returns: (True, None) if archive is check compliant, (False, ) otherwise. """ archive_path = archive_request.archive.path + + if not known_archive_format(archive_path): + return False, MANDATORY_ARCHIVE_UNSUPPORTED + try: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path) as f: files = f.namelist() elif tarfile.is_tarfile(archive_path): with tarfile.open(archive_path) as f: files = f.getnames() else: return False, MANDATORY_ARCHIVE_UNSUPPORTED except Exception: return False, MANDATORY_ARCHIVE_UNREADABLE if len(files) > 1: return True, None element = files[0] if PATTERN_ARCHIVE_EXTENSION.match(element): # archive in archive! return False, MANDATORY_ARCHIVE_INVALID return True, None def _check_metadata(self, metadata): """Check to execute on all metadata for mandatory field presence. Args: metadata (dict): Metadata dictionary to check for mandatory fields Returns: tuple (status, error_detail): True, None if metadata are ok (False, ) otherwise. """ required_fields = { 'author': False, } alternate_fields = { ('name', 'title'): False, # alternate field, at least one # of them must be present } for field, value in metadata.items(): for name in required_fields: if name in field: required_fields[name] = True for possible_names in alternate_fields: for possible_name in possible_names: if possible_name in field: alternate_fields[possible_names] = True continue mandatory_result = [k for k, v in required_fields.items() if not v] optional_result = [ ' or '.join(k) for k, v in alternate_fields.items() if not v] if mandatory_result == [] and optional_result == []: return True, None detail = [] if mandatory_result != []: detail.append({ 'summary': MANDATORY_FIELDS_MISSING, 'fields': mandatory_result }) if optional_result != []: detail.append({ 'summary': ALTERNATE_FIELDS_MISSING, 'fields': optional_result, }) return False, { 'metadata': detail } def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ deposit = Deposit.objects.get(pk=deposit_id) metadata = self._metadata_get(deposit) problems = {} # will check each deposit's associated request (both of type # archive and metadata) for errors archives_status, error_detail = self._check_deposit_archives(deposit) if not archives_status: problems.update(error_detail) metadata_status, error_detail = self._check_metadata(metadata) if not metadata_status: problems.update(error_detail) deposit_status = archives_status and metadata_status # if any problems arose, the deposit is rejected if not deposit_status: deposit.status = DEPOSIT_STATUS_REJECTED deposit.status_detail = problems response = { 'status': deposit.status, 'details': deposit.status_detail, } else: deposit.status = DEPOSIT_STATUS_VERIFIED response = { 'status': deposit.status, } + if not deposit.load_task_id and self.config['checks']: + url = deposit.origin_url + task = create_oneshot_task_dict( + 'load-deposit', url=url, deposit_id=deposit.id) + load_task_id = self.scheduler.create_tasks([task])[0]['id'] + deposit.load_task_id = load_task_id deposit.save() return status.HTTP_200_OK, json.dumps(response), 'application/json' diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index 8b834b04..c9a575c5 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,234 +1,233 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import tempfile from contextlib import contextmanager from django.http import FileResponse from rest_framework import status from swh.core import tarball from swh.model import identifiers from swh.deposit.utils import normalize_date -from swh.deposit import utils from . import DepositReadMixin, SWHPrivateAPIView from ...config import SWH_PERSON, ARCHIVE_TYPE from ..common import SWHGetDepositAPI from ...models import Deposit @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ if len(archive_paths) > 1: # need to rebuild one archive from multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.deposit-', dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = tarball.compress( aggregated_tarball_rootdir + '.zip', nature='zip', dirpath_or_files=aggregated_tarball_rootdir) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) else: # only 1 archive, no need to do fancy actions (and no cleanup step) yield archive_paths[0] class SWHDepositReadArchives(SWHPrivateAPIView, SWHGetDepositAPI, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ ADDITIONAL_CONFIG = { 'extraction_dir': ('str', '/tmp/swh-deposit/archive/'), } def __init__(self): super().__init__() self.extraction_dir = self.config['extraction_dir'] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = [r.archive.path for r in self._deposit_requests( deposit_id, request_type=ARCHIVE_TYPE)] with aggregate_tarballs(self.extraction_dir, archive_paths) as path: return FileResponse(open(path, 'rb'), status=status.HTTP_200_OK, content_type='application/octet-stream') class SWHDepositReadMetadata(SWHPrivateAPIView, SWHGetDepositAPI, DepositReadMixin): """Class in charge of aggregating metadata on a deposit. """ ADDITIONAL_CONFIG = { 'provider': ('dict', { # 'provider_name': '', # those are not set since read from the # 'provider_url': '', # deposit's client 'provider_type': 'deposit_client', 'metadata': {} }), 'tool': ('dict', { 'name': 'swh-deposit', 'version': '0.0.1', 'configuration': { 'sword_version': '2' } }) } def __init__(self): super().__init__() self.provider = self.config['provider'] self.tool = self.config['tool'] def _normalize_dates(self, deposit, metadata): """Normalize the date to use as a tuple of author date, committer date from the incoming metadata. Args: deposit (Deposit): Deposit model representation metadata (Dict): Metadata dict representation Returns: Tuple of author date, committer date. Those dates are swh normalized. """ commit_date = metadata.get('codemeta:datePublished') author_date = metadata.get('codemeta:dateCreated') if author_date and commit_date: pass elif commit_date: author_date = commit_date elif author_date: commit_date = author_date else: author_date = deposit.complete_date commit_date = deposit.complete_date return ( normalize_date(author_date), normalize_date(commit_date) ) def metadata_read(self, deposit): """Read and aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ metadata = self._metadata_get(deposit) # Read information metadata data = { 'origin': { 'type': 'deposit', - 'url': utils.origin_url_from(deposit), + 'url': deposit.origin_url, } } # revision fullname = deposit.client.username author_committer = SWH_PERSON # metadata provider self.provider['provider_name'] = deposit.client.last_name self.provider['provider_url'] = deposit.client.provider_url revision_type = 'tar' revision_msg = '%s: Deposit %s in collection %s' % ( fullname, deposit.id, deposit.collection.name) author_date, commit_date = self._normalize_dates(deposit, metadata) data['revision'] = { 'synthetic': True, 'date': author_date, 'committer_date': commit_date, 'author': author_committer, 'committer': author_committer, 'type': revision_type, 'message': revision_msg, 'metadata': metadata, } if deposit.parent: swh_persistent_id = deposit.parent.swh_id persistent_identifier = identifiers.parse_persistent_identifier( swh_persistent_id) parent_revision = persistent_identifier.object_id data['revision']['parents'] = [parent_revision] data['branch_name'] = 'master' data['origin_metadata'] = { 'provider': self.provider, 'tool': self.tool, 'metadata': metadata } return data def process_get(self, req, collection_name, deposit_id): deposit = Deposit.objects.get(pk=deposit_id) data = self.metadata_read(deposit) d = {} if data: d = json.dumps(data) return status.HTTP_200_OK, d, 'application/json' diff --git a/swh/deposit/client/__init__.py b/swh/deposit/client/__init__.py index b4b7f7e2..35b34193 100644 --- a/swh/deposit/client/__init__.py +++ b/swh/deposit/client/__init__.py @@ -1,580 +1,581 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining an swh-deposit client """ import hashlib import os import requests import xmltodict import logging from abc import ABCMeta, abstractmethod +from urllib.parse import urljoin from swh.core.config import SWHConfig logger = logging.getLogger(__name__) def _parse(stream, encoding='utf-8'): """Given a xml stream, parse the result. Args: stream (bytes/text): The stream to parse encoding (str): The encoding to use if to decode the bytes stream Returns: A dict of values corresponding to the parsed xml """ if isinstance(stream, bytes): stream = stream.decode(encoding) data = xmltodict.parse(stream, encoding=encoding, process_namespaces=False) if 'entry' in data: data = data['entry'] if 'sword:error' in data: data = data['sword:error'] return dict(data) def _parse_with_filter(stream, encoding='utf-8', keys=[]): """Given a xml stream, parse the result and filter with keys. Args: stream (bytes/text): The stream to parse encoding (str): The encoding to use if to decode the bytes stream keys ([str]): Keys to filter the parsed result Returns: A dict of values corresponding to the parsed xml filtered by the keys provided. """ data = _parse(stream, encoding=encoding) m = {} for key in keys: m[key] = data.get(key) return m class BaseApiDepositClient(SWHConfig): """Deposit client base class """ CONFIG_BASE_FILENAME = 'deposit/client' DEFAULT_CONFIG = { 'url': ('str', 'http://localhost:5006'), 'auth': ('dict', {}), # with optional 'username'/'password' keys } def __init__(self, config=None, _client=requests): super().__init__() if config is None: self.config = super().parse_config_file() else: self.config = config self._client = _client - self.base_url = self.config['url'] + self.base_url = self.config['url'].strip('/') + '/' auth = self.config['auth'] if auth == {}: self.auth = None else: self.auth = (auth['username'], auth['password']) def do(self, method, url, *args, **kwargs): """Internal method to deal with requests, possibly with basic http authentication. Args: method (str): supported http methods as in self._methods' keys Returns: The request's execution """ if hasattr(self._client, method): method_fn = getattr(self._client, method) else: raise ValueError('Development error, unsupported method %s' % ( method)) if self.auth: kwargs['auth'] = self.auth - full_url = '%s%s' % (self.base_url.rstrip('/'), url) + full_url = urljoin(self.base_url, url.lstrip('/')) return method_fn(full_url, *args, **kwargs) class PrivateApiDepositClient(BaseApiDepositClient): """Private API deposit client to: - read a given deposit's archive(s) - read a given deposit's metadata - update a given deposit's status """ def archive_get(self, archive_update_url, archive): """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ r = self.do('get', archive_update_url, stream=True) if r.ok: with open(archive, 'wb') as f: for chunk in r.iter_content(): f.write(chunk) return archive msg = 'Problem when retrieving deposit archive at %s' % ( archive_update_url, ) logger.error(msg) raise ValueError(msg) def metadata_get(self, metadata_url): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = self.do('get', metadata_url) if r.ok: return r.json() msg = 'Problem when retrieving metadata at %s' % metadata_url logger.error(msg) raise ValueError(msg) def status_update(self, update_status_url, status, revision_id=None, directory_id=None, origin_url=None): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to directory_id (str/None): the directory's identifier to update to origin_url (str/None): deposit's associated origin url """ payload = {'status': status} if revision_id: payload['revision_id'] = revision_id if directory_id: payload['directory_id'] = directory_id if origin_url: payload['origin_url'] = origin_url self.do('put', update_status_url, json=payload) def check(self, check_url): """Check the deposit's associated data (metadata, archive(s)) Args: check_url (str): the full deposit's check url """ r = self.do('get', check_url) if r.ok: data = r.json() return data['status'] msg = 'Problem when checking deposit %s' % check_url logger.error(msg) raise ValueError(msg) class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta): """Base Deposit client to access the public api. """ def __init__(self, config, error_msg=None, empty_result={}): super().__init__(config) self.error_msg = error_msg self.empty_result = empty_result @abstractmethod def compute_url(self, *args, **kwargs): """Compute api url endpoint to query.""" pass @abstractmethod def compute_method(self, *args, **kwargs): """Http method to use on the url""" pass @abstractmethod def parse_result_ok(self, xml_content): """Given an xml result from the api endpoint, parse it and returns a dict. """ pass def compute_information(self, *args, **kwargs): """Compute some more information given the inputs (e.g http headers, ...) """ return {} def parse_result_error(self, xml_content): """Given an error response in xml, parse it into a dict. Returns: dict with following keys: 'error': The error message 'detail': Some more detail about the error if any """ return _parse_with_filter(xml_content, keys=[ 'summary', 'detail', 'sword:verboseDescription']) def do_execute(self, method, url, info): """Execute the http query to url using method and info information. By default, execute a simple query to url with the http method. Override this in daughter class to improve the default behavior if needed. """ return self.do(method, url) def execute(self, *args, **kwargs): """Main endpoint to prepare and execute the http query to the api. """ url = self.compute_url(*args, **kwargs) method = self.compute_method(*args, **kwargs) info = self.compute_information(*args, **kwargs) try: r = self.do_execute(method, url, info) except Exception as e: msg = self.error_msg % (url, e) r = self.empty_result r.update({ 'error': msg, }) return r else: if r.ok: if int(r.status_code) == 204: # 204 returns no body return {'status': r.status_code} else: return self.parse_result_ok(r.text) else: error = self.parse_result_error(r.text) empty = self.empty_result error.update(empty) error.update({ 'status': r.status_code, }) return error class ServiceDocumentDepositClient(BaseDepositClient): """Service Document information retrieval. """ def __init__(self, config): super().__init__(config, error_msg='Service document failure at %s: %s', empty_result={'collection': None}) def compute_url(self, *args, **kwargs): return '/servicedocument/' def compute_method(self, *args, **kwargs): return 'get' def parse_result_ok(self, xml_content): """Parse service document's success response. """ return _parse(xml_content) class StatusDepositClient(BaseDepositClient): """Status information on a deposit. """ def __init__(self, config): super().__init__(config, error_msg='Status check failure at %s: %s', empty_result={ 'deposit_status': None, 'deposit_status_detail': None, 'deposit_swh_id': None, }) def compute_url(self, collection, deposit_id): return '/%s/%s/status/' % (collection, deposit_id) def compute_method(self, *args, **kwargs): return 'get' def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ return _parse_with_filter(xml_content, keys=[ 'deposit_id', 'deposit_status', 'deposit_status_detail', 'deposit_swh_id', 'deposit_swh_id_context', 'deposit_swh_anchor_id', 'deposit_swh_anchor_id_context', 'deposit_external_id', ]) class BaseCreateDepositClient(BaseDepositClient): """Deposit client base class to post new deposit. """ def __init__(self, config): super().__init__(config, error_msg='Post Deposit failure at %s: %s', empty_result={ 'deposit_id': None, 'deposit_status': None, }) def compute_url(self, collection, *args, **kwargs): return '/%s/' % collection def compute_method(self, *args, **kwargs): return 'post' def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ return _parse_with_filter(xml_content, keys=['deposit_id', 'deposit_status', 'deposit_status_detail', 'deposit_date']) def _compute_information(self, collection, filepath, in_progress, slug, is_archive=True): """Given a filepath, compute necessary information on that file. Args: filepath (str): Path to a file is_archive (bool): is it an archive or not? Returns: dict with keys: 'content-type': content type associated 'md5sum': md5 sum 'filename': filename """ filename = os.path.basename(filepath) if is_archive: md5sum = hashlib.md5(open(filepath, 'rb').read()).hexdigest() extension = filename.split('.')[-1] if 'zip' in extension: content_type = 'application/zip' else: content_type = 'application/x-tar' else: content_type = None md5sum = None return { 'slug': slug, 'in_progress': in_progress, 'content-type': content_type, 'md5sum': md5sum, 'filename': filename, 'filepath': filepath, } def compute_information(self, collection, filepath, in_progress, slug, is_archive=True, **kwargs): info = self._compute_information(collection, filepath, in_progress, slug, is_archive=is_archive) info['headers'] = self.compute_headers(info) return info def do_execute(self, method, url, info): with open(info['filepath'], 'rb') as f: return self.do(method, url, data=f, headers=info['headers']) class CreateArchiveDepositClient(BaseCreateDepositClient): """Post an archive (binary) deposit client.""" def compute_headers(self, info): return { 'SLUG': info['slug'], 'CONTENT_MD5': info['md5sum'], 'IN-PROGRESS': str(info['in_progress']), 'CONTENT-TYPE': info['content-type'], 'CONTENT-DISPOSITION': 'attachment; filename=%s' % ( info['filename'], ), } class UpdateArchiveDepositClient(CreateArchiveDepositClient): """Update (add/replace) an archive (binary) deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return '/%s/%s/media/' % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return 'put' if replace else 'post' class CreateMetadataDepositClient(BaseCreateDepositClient): """Post a metadata deposit client.""" def compute_headers(self, info): return { 'SLUG': info['slug'], 'IN-PROGRESS': str(info['in_progress']), 'CONTENT-TYPE': 'application/atom+xml;type=entry', } class UpdateMetadataDepositClient(CreateMetadataDepositClient): """Update (add/replace) a metadata deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return '/%s/%s/metadata/' % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return 'put' if replace else 'post' class CreateMultipartDepositClient(BaseCreateDepositClient): """Create a multipart deposit client.""" def _multipart_info(self, info, info_meta): files = [ ('file', (info['filename'], open(info['filepath'], 'rb'), info['content-type'])), ('atom', (info_meta['filename'], open(info_meta['filepath'], 'rb'), 'application/atom+xml')), ] headers = { 'SLUG': info['slug'], 'CONTENT_MD5': info['md5sum'], 'IN-PROGRESS': str(info['in_progress']), } return files, headers def compute_information(self, collection, archive, metadata, in_progress, slug, **kwargs): info = self._compute_information( collection, archive, in_progress, slug) info_meta = self._compute_information( collection, metadata, in_progress, slug, is_archive=False) files, headers = self._multipart_info(info, info_meta) return {'files': files, 'headers': headers} def do_execute(self, method, url, info): return self.do( method, url, files=info['files'], headers=info['headers']) class UpdateMultipartDepositClient(CreateMultipartDepositClient): """Update a multipart deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return '/%s/%s/metadata/' % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return 'put' if replace else 'post' class PublicApiDepositClient(BaseApiDepositClient): """Public api deposit client.""" def service_document(self): """Retrieve service document endpoint's information.""" return ServiceDocumentDepositClient(self.config).execute() def deposit_status(self, collection, deposit_id): """Retrieve status information on a deposit.""" return StatusDepositClient(self.config).execute( collection, deposit_id) def deposit_create(self, collection, slug, archive=None, metadata=None, in_progress=False): """Create a new deposit (archive, metadata, both as multipart).""" if archive and not metadata: return CreateArchiveDepositClient(self.config).execute( collection, archive, in_progress, slug) elif not archive and metadata: return CreateMetadataDepositClient(self.config).execute( collection, metadata, in_progress, slug, is_archive=False) else: return CreateMultipartDepositClient(self.config).execute( collection, archive, metadata, in_progress, slug) def deposit_update(self, collection, deposit_id, slug, archive=None, metadata=None, in_progress=False, replace=False): """Update (add/replace) existing deposit (archive, metadata, both).""" r = self.deposit_status(collection, deposit_id) if 'error' in r: return r status = r['deposit_status'] if status != 'partial': return { 'error': "You can only act on deposit with status 'partial'", 'detail': "The deposit %s has status '%s'" % ( deposit_id, status), 'deposit_status': status, 'deposit_id': deposit_id, } if archive and not metadata: r = UpdateArchiveDepositClient(self.config).execute( collection, archive, in_progress, slug, deposit_id=deposit_id, replace=replace) elif not archive and metadata: r = UpdateMetadataDepositClient(self.config).execute( collection, metadata, in_progress, slug, deposit_id=deposit_id, replace=replace) else: r = UpdateMultipartDepositClient(self.config).execute( collection, archive, metadata, in_progress, slug, deposit_id=deposit_id, replace=replace) if 'error' in r: return r return self.deposit_status(collection, deposit_id) diff --git a/swh/deposit/models.py b/swh/deposit/models.py index e7c5440d..1a7a78ea 100644 --- a/swh/deposit/models.py +++ b/swh/deposit/models.py @@ -1,223 +1,228 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Generated from: # cd swh_deposit && \ # python3 -m manage inspectdb from django.contrib.postgres.fields import JSONField, ArrayField from django.contrib.auth.models import User, UserManager from django.db import models from django.utils.timezone import now from .config import ( DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_REJECTED, ARCHIVE_TYPE, METADATA_TYPE ) class Dbversion(models.Model): """Db version """ version = models.IntegerField(primary_key=True) release = models.DateTimeField(default=now, null=True) description = models.TextField(blank=True, null=True) class Meta: db_table = 'dbversion' def __str__(self): return str({ 'version': self.version, 'release': self.release, 'description': self.description }) """Possible status""" DEPOSIT_STATUS = [ (DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_PARTIAL), ('expired', 'expired'), (DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_DEPOSITED), (DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_VERIFIED), (DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_REJECTED), ('loading', 'loading'), (DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_SUCCESS), (DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_FAILURE), ] """Possible status and the detailed meaning.""" DEPOSIT_STATUS_DETAIL = { DEPOSIT_STATUS_PARTIAL: 'Deposit is partially received. To finalize it, ' 'In-Progress header should be false', 'expired': 'Deposit has been there too long and is now ' 'deemed ready to be garbage collected', DEPOSIT_STATUS_DEPOSITED: 'Deposit is ready for additional checks ' '(tarball ok, metadata, etc...)', DEPOSIT_STATUS_VERIFIED: 'Deposit is fully received, checked, and ' 'ready for loading', DEPOSIT_STATUS_REJECTED: 'Deposit failed the checks', 'loading': "Loading is ongoing on swh's side", DEPOSIT_STATUS_LOAD_SUCCESS: 'The deposit has been successfully ' 'loaded into the Software Heritage archive', DEPOSIT_STATUS_LOAD_FAILURE: 'The deposit loading into the ' 'Software Heritage archive failed', } class DepositClient(User): """Deposit client """ collections = ArrayField(models.IntegerField(), null=True) objects = UserManager() # type: ignore # this typing hint is due to a mypy/django-stubs limitation, # see https://github.com/typeddjango/django-stubs/issues/174 provider_url = models.TextField(null=False) domain = models.TextField(null=False) class Meta: db_table = 'deposit_client' def __str__(self): return str({ 'id': self.id, 'collections': self.collections, 'username': super().username, 'domain': self.domain, 'provider_url': self.provider_url, }) class Deposit(models.Model): """Deposit reception table """ id = models.BigAutoField(primary_key=True) # First deposit reception date reception_date = models.DateTimeField(auto_now_add=True) # Date when the deposit is deemed complete and ready for loading complete_date = models.DateTimeField(null=True) # collection concerned by the deposit collection = models.ForeignKey( 'DepositCollection', models.DO_NOTHING) # Deposit's external identifier external_id = models.TextField() # Deposit client client = models.ForeignKey('DepositClient', models.DO_NOTHING) # SWH's loading result identifier swh_id = models.TextField(blank=True, null=True) swh_id_context = models.TextField(blank=True, null=True) swh_anchor_id = models.TextField(blank=True, null=True) swh_anchor_id_context = models.TextField(blank=True, null=True) # Deposit's status regarding loading status = models.TextField( choices=DEPOSIT_STATUS, default=DEPOSIT_STATUS_PARTIAL) status_detail = JSONField(null=True) # deposit can have one parent parent = models.ForeignKey('self', on_delete=models.PROTECT, null=True) check_task_id = models.TextField( blank=True, null=True, verbose_name="Scheduler's associated checking task id" ) load_task_id = models.TextField( blank=True, null=True, verbose_name="Scheduler's associated loading task id" ) class Meta: db_table = 'deposit' def __str__(self): d = { 'id': self.id, 'reception_date': self.reception_date, 'collection': self.collection.name, 'external_id': self.external_id, 'client': self.client.username, 'status': self.status, } if self.status in (DEPOSIT_STATUS_REJECTED): d['status_detail'] = self.status_detail return str(d) + @property + def origin_url(self): + return '%s/%s' % (self.client.provider_url.rstrip('/'), + self.external_id) + def client_directory_path(instance, filename): """Callable to upload archive in MEDIA_ROOT/user_/ Args: instance (DepositRequest): DepositRequest concerned by the upload filename (str): Filename of the uploaded file Returns: A path to be prefixed by the MEDIA_ROOT to access physically to the file uploaded. """ return 'client_{0}/{1}'.format(instance.deposit.client.id, filename) REQUEST_TYPES = [(ARCHIVE_TYPE, ARCHIVE_TYPE), (METADATA_TYPE, METADATA_TYPE)] class DepositRequest(models.Model): """Deposit request associated to one deposit. """ id = models.BigAutoField(primary_key=True) # Deposit concerned by the request deposit = models.ForeignKey(Deposit, models.DO_NOTHING) date = models.DateTimeField(auto_now_add=True) # Deposit request information on the data to inject # this can be null when type is 'archive' metadata = JSONField(null=True) raw_metadata = models.TextField(null=True) # this can be null when type is 'metadata' archive = models.FileField(null=True, upload_to=client_directory_path) type = models.CharField(max_length=8, choices=REQUEST_TYPES, null=True) class Meta: db_table = 'deposit_request' def __str__(self): meta = None if self.metadata: from json import dumps meta = dumps(self.metadata) archive_name = None if self.archive: archive_name = self.archive.name return str({ 'id': self.id, 'deposit': self.deposit, 'metadata': meta, 'archive': archive_name }) class DepositCollection(models.Model): id = models.BigAutoField(primary_key=True) # Human readable name for the collection type e.g HAL, arXiv, etc... name = models.TextField() class Meta: db_table = 'deposit_collection' def __str__(self): return str({'id': self.id, 'name': self.name}) diff --git a/swh/deposit/tests/api/test_deposit_private_check.py b/swh/deposit/tests/api/test_deposit_private_check.py index 1c90113a..50f0b1d0 100644 --- a/swh/deposit/tests/api/test_deposit_private_check.py +++ b/swh/deposit/tests/api/test_deposit_private_check.py @@ -1,263 +1,263 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.urls import reverse from rest_framework import status from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, COL_IRI ) from swh.deposit.api.private.deposit_check import ( MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING, MANDATORY_ARCHIVE_MISSING ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( create_arborescence_archive, create_archive_with_archive ) PRIVATE_CHECK_DEPOSIT_NC = PRIVATE_CHECK_DEPOSIT + '-nc' def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_CHECK_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_CHECK_DEPOSIT_NC, args=[deposit.id]) ] def test_deposit_ok( authenticated_client, deposit_collection, ready_deposit_ok): """Proper deposit should succeed the checks (-> status ready) """ deposit = ready_deposit_ok for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_deposit_invalid_tarball( tmp_path, authenticated_client, deposit_collection): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: deposit = create_deposit_archive_with_archive( tmp_path, archive_extension, authenticated_client, deposit_collection.name) for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure assert len(details['archive']) == 1 assert details['archive'][0]['summary'] == \ MANDATORY_ARCHIVE_INVALID deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED def test_deposit_ko_missing_tarball( authenticated_client, deposit_collection, ready_deposit_only_metadata): """Deposit without archive should fail the checks: rejected """ deposit = ready_deposit_only_metadata assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure assert len(details['archive']) == 1 assert details['archive'][0]['summary'] == MANDATORY_ARCHIVE_MISSING deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_deposit_ko_unsupported_tarball( tmp_path, authenticated_client, deposit_collection, ready_deposit_invalid_archive): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit = ready_deposit_invalid_archive assert DEPOSIT_STATUS_DEPOSITED == deposit.status for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure assert len(details['archive']) == 1 assert details['archive'][0]['summary'] == \ MANDATORY_ARCHIVE_UNSUPPORTED # metadata check failure assert len(details['metadata']) == 2 mandatory = details['metadata'][0] assert mandatory['summary'] == MANDATORY_FIELDS_MISSING assert set(mandatory['fields']) == set(['author']) alternate = details['metadata'][1] assert alternate['summary'] == ALTERNATE_FIELDS_MISSING assert alternate['fields'] == ['name or title'] deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_deposit_metadata_ok( authenticated_client, deposit_collection, ready_deposit_ok): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit = ready_deposit_ok assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_metadata_ok(swh_checks_deposit): actual_check, detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) assert actual_check is True assert detail is None def test_check_metadata_ok2(swh_checks_deposit): actual_check, detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) assert actual_check is True assert detail is None def test_check_metadata_ko(swh_checks_deposit): """Missing optional field should be caught """ actual_check, error_detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory alternate fields are missing', 'fields': ['name or title'], }] } assert actual_check is False assert error_detail == expected_error def test_check_metadata_ko2(swh_checks_deposit): """Missing mandatory fields should be caught """ actual_check, error_detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory fields are missing', 'fields': ['author'], }] } assert actual_check is False assert error_detail == expected_error def create_deposit_archive_with_archive( root_path, archive_extension, client, collection_name): # we create the holding archive to a given extension archive = create_arborescence_archive( root_path, 'archive1', 'file1', b'some content in file', extension=archive_extension) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive( - root_path, 'invalid.tar.gz', archive) + root_path, 'invalid.tgz', archive) # we deposit it response = client.post( reverse(COL_IRI, args=[collection_name]), content_type='application/x-tar', data=invalid_archive['data'], CONTENT_LENGTH=invalid_archive['length'], HTTP_MD5SUM=invalid_archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( invalid_archive['name'], )) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_status = response_content['deposit_status'] assert deposit_status == DEPOSIT_STATUS_DEPOSITED deposit_id = int(response_content['deposit_id']) deposit = Deposit.objects.get(pk=deposit_id) assert DEPOSIT_STATUS_DEPOSITED == deposit.status return deposit diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py index f44710a6..a7486f7e 100644 --- a/swh/deposit/tests/test_utils.py +++ b/swh/deposit/tests/test_utils.py @@ -1,215 +1,178 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from unittest.mock import patch from swh.deposit import utils -from swh.deposit.models import Deposit, DepositClient - - -def test_origin_url_from(): - """With correctly setup-ed deposit, all is fine - - """ - for provider_url, external_id in ( - ('http://somewhere.org', 'uuid'), - ('http://overthejungle.org', 'diuu'), - ): - deposit = Deposit( - client=DepositClient(provider_url=provider_url), - external_id=external_id - ) - - actual_origin_url = utils.origin_url_from(deposit) - - assert actual_origin_url == '%s/%s' % ( - provider_url.rstrip('/'), external_id) - - -def test_origin_url_from_ko(): - """Badly configured deposit should raise - - """ - for provider_url, external_id in ( - (None, 'uuid'), - ('http://overthejungle.org', None), - ): - deposit = Deposit( - client=DepositClient(provider_url=provider_url), - external_id=None - ) - - with pytest.raises(AssertionError): - utils.origin_url_from(deposit) def test_merge(): """Calling utils.merge on dicts should merge without losing information """ d0 = { 'author': 'someone', 'license': [['gpl2']], 'a': 1 } d1 = { 'author': ['author0', {'name': 'author1'}], 'license': [['gpl3']], 'b': { '1': '2' } } d2 = { 'author': map(lambda x: x, ['else']), 'license': 'mit', 'b': { '2': '3', } } d3 = { 'author': (v for v in ['no one']), } actual_merge = utils.merge(d0, d1, d2, d3) expected_merge = { 'a': 1, 'license': [['gpl2'], ['gpl3'], 'mit'], 'author': [ 'someone', 'author0', {'name': 'author1'}, 'else', 'no one'], 'b': { '1': '2', '2': '3', } } assert actual_merge == expected_merge def test_merge_2(): d0 = { 'license': 'gpl2', 'runtime': { 'os': 'unix derivative' } } d1 = { 'license': 'gpl3', 'runtime': 'GNU/Linux' } expected = { 'license': ['gpl2', 'gpl3'], 'runtime': [ { 'os': 'unix derivative' }, 'GNU/Linux' ], } actual = utils.merge(d0, d1) assert actual == expected def test_merge_edge_cases(): input_dict = { 'license': ['gpl2', 'gpl3'], 'runtime': [ { 'os': 'unix derivative' }, 'GNU/Linux' ], } # against empty dict actual = utils.merge(input_dict, {}) assert actual == input_dict # against oneself actual = utils.merge(input_dict, input_dict, input_dict) assert actual == input_dict def test_merge_one_dict(): """Merge one dict should result in the same dict value """ input_and_expected = {'anything': 'really'} actual = utils.merge(input_and_expected) assert actual == input_and_expected def test_merge_raise(): """Calling utils.merge with any no dict argument should raise """ d0 = { 'author': 'someone', 'a': 1 } d1 = ['not a dict'] with pytest.raises(ValueError): utils.merge(d0, d1) with pytest.raises(ValueError): utils.merge(d1, d0) with pytest.raises(ValueError): utils.merge(d1) assert utils.merge(d0) == d0 @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) def test_normalize_date_0(mock_normalize): """When date is a list, choose the first date and normalize it Note: We do not test swh.model.identifiers which is already tested in swh.model """ actual_date = utils.normalize_date(['2017-10-12', 'date1']) expected_date = '2017-10-12 00:00:00+00:00' assert str(actual_date) == expected_date @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) def test_normalize_date_1(mock_normalize): """Providing a date in a reasonable format, everything is fine Note: We do not test swh.model.identifiers which is already tested in swh.model """ actual_date = utils.normalize_date('2018-06-11 17:02:02') expected_date = '2018-06-11 17:02:02+00:00' assert str(actual_date) == expected_date @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) def test_normalize_date_doing_irrelevant_stuff(mock_normalize): """Providing a date with only the year results in a reasonable date Note: We do not test swh.model.identifiers which is already tested in swh.model """ actual_date = utils.normalize_date('2017') expected_date = '2017-01-01 00:00:00+00:00' assert str(actual_date) == expected_date diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py index beb31ef6..86775ac3 100644 --- a/swh/deposit/utils.py +++ b/swh/deposit/utils.py @@ -1,108 +1,83 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import iso8601 from types import GeneratorType from swh.model.identifiers import normalize_timestamp -def origin_url_from(deposit): - """Given a deposit instance, return the associated origin url. - - This expects a deposit and the associated client to be correctly - configured. - - Args: - deposit (Deposit): The deposit from which derives the origin url - - Raises: - AssertionError if: - - the client's provider_url field is not configured. - - the deposit's external_id field is not configured. - - Returns - The associated origin url - - """ - external_id = deposit.external_id - assert external_id is not None - base_url = deposit.client.provider_url - assert base_url is not None - return '%s/%s' % (base_url.rstrip('/'), external_id) - - def merge(*dicts): """Given an iterator of dicts, merge them losing no information. Args: *dicts: arguments are all supposed to be dict to merge into one Returns: dict merged without losing information """ def _extend(existing_val, value): """Given an existing value and a value (as potential lists), merge them together without repetition. """ if isinstance(value, (list, map, GeneratorType)): vals = value else: vals = [value] for v in vals: if v in existing_val: continue existing_val.append(v) return existing_val d = {} for data in dicts: if not isinstance(data, dict): raise ValueError( 'dicts is supposed to be a variable arguments of dict') for key, value in data.items(): existing_val = d.get(key) if not existing_val: d[key] = value continue if isinstance(existing_val, (list, map, GeneratorType)): new_val = _extend(existing_val, value) elif isinstance(existing_val, dict): if isinstance(value, dict): new_val = merge(existing_val, value) else: new_val = _extend([existing_val], value) else: new_val = _extend([existing_val], value) d[key] = new_val return d def normalize_date(date): """Normalize date fields as expected by swh workers. If date is a list, elect arbitrarily the first element of that list If date is (then) a string, parse it through dateutil.parser.parse to extract a datetime. Then normalize it through swh.model.identifiers.normalize_timestamp. Returns The swh date object """ if isinstance(date, list): date = date[0] if isinstance(date, str): date = iso8601.parse_date(date) return normalize_timestamp(date)