Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/tasks.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
from typing import Optional | from typing import Optional | ||||
from celery import shared_task | from celery import shared_task | ||||
import iso8601 | import iso8601 | ||||
from .loader import CvsLoader | from .loader import CvsLoader | ||||
def convert_to_datetime(date: Optional[str]) -> Optional[datetime]: | def convert_to_datetime(date: Optional[str]) -> Optional[datetime]: | ||||
if date is None: | |||||
return None | |||||
try: | try: | ||||
assert isinstance(date, str) | |||||
return iso8601.parse_date(date) | return iso8601.parse_date(date) | ||||
except Exception: | except Exception: | ||||
return None | return None | ||||
@shared_task(name=__name__ + ".LoadCvsRepository") | @shared_task(name=__name__ + ".LoadCvsRepository") | ||||
def load_cvs( | def load_cvs( | ||||
*, | *, | ||||
Show All 27 Lines |