diff --git a/swh/loader/cvs/tasks.py b/swh/loader/cvs/tasks.py --- a/swh/loader/cvs/tasks.py +++ b/swh/loader/cvs/tasks.py @@ -1,45 +1,23 @@ -# Copyright (C) 2015-2021 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime -from typing import Optional - from celery import shared_task -import iso8601 + +from swh.loader.core.utils import parse_visit_date from .loader import CvsLoader -def convert_to_datetime(date: Optional[str]) -> Optional[datetime]: - if date is None: - return None - try: - assert isinstance(date, str) - return iso8601.parse_date(date) - except Exception: - return None +def _process_kwargs(kwargs): + if "visit_date" in kwargs: + kwargs["visit_date"] = parse_visit_date(kwargs["visit_date"]) + return kwargs @shared_task(name=__name__ + ".LoadCvsRepository") -def load_cvs( - *, - url: str, - origin_url: Optional[str] = None, - visit_date: Optional[str] = None, -): - """Import a CVS repository - - Args: - - url: (mandatory) CVS's repository url to ingest data from - - origin_url: Optional original url override to use as origin reference - in the archive. If not provided, "url" is used as origin. - - visit_date: Optional date to override the visit date - """ - loader = CvsLoader.from_configfile( - url=url, - origin_url=origin_url, - visit_date=convert_to_datetime(visit_date), - ) +def load_cvs(**kwargs): + """Import a CVS repository""" + loader = CvsLoader.from_configfile(**_process_kwargs(kwargs)) return loader.load() diff --git a/swh/loader/cvs/tests/test_tasks.py b/swh/loader/cvs/tests/test_tasks.py --- a/swh/loader/cvs/tests/test_tasks.py +++ b/swh/loader/cvs/tests/test_tasks.py @@ -1,42 +1,65 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime, timezone +import uuid import pytest -from swh.loader.cvs.tasks import convert_to_datetime +from swh.scheduler.model import ListedOrigin, Lister +from swh.scheduler.utils import create_origin_task_dict -@pytest.mark.parametrize( - "date,expected_result", - [ - (None, None), - ( - "2021-11-23 09:41:02.434195+00:00", - datetime(2021, 11, 23, 9, 41, 2, 434195, tzinfo=timezone.utc), - ), - ( - "23112021", - None, - ), # failure to parse - ], -) -def test_convert_to_datetime(date, expected_result): - assert convert_to_datetime(date) == expected_result +@pytest.fixture(autouse=True) +def celery_worker_and_swh_config(swh_scheduler_celery_worker, swh_config): + pass + + +@pytest.fixture +def cvs_lister(): + return Lister(name="cvs-lister", instance_name="example", id=uuid.uuid4()) + + +@pytest.fixture +def cvs_listed_origin(cvs_lister): + return ListedOrigin( + lister_id=cvs_lister.id, url="https://cvs.example.org/repo", visit_type="cvs" + ) def test_cvs_loader( - mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config + mocker, + swh_scheduler_celery_app, ): mock_loader = mocker.patch("swh.loader.cvs.loader.CvsLoader.load") mock_loader.return_value = {"status": "eventful"} res = swh_scheduler_celery_app.send_task( "swh.loader.cvs.tasks.LoadCvsRepository", - kwargs=dict(url="some-technical-url", origin_url="origin-url"), + kwargs=dict( + url="some-technical-url", origin_url="origin-url", visit_date="now" + ), + ) + assert res + res.wait() + assert res.successful() + + assert res.result == {"status": "eventful"} + assert mock_loader.called + + +def test_cvs_loader_for_listed_origin( + mocker, swh_scheduler_celery_app, cvs_lister, cvs_listed_origin +): + mock_loader = mocker.patch("swh.loader.cvs.loader.CvsLoader.load") + mock_loader.return_value = {"status": "eventful"} + + task_dict = create_origin_task_dict(cvs_listed_origin, cvs_lister) + + res = swh_scheduler_celery_app.send_task( + "swh.loader.cvs.tasks.LoadCvsRepository", + kwargs=task_dict["arguments"]["kwargs"], ) assert res res.wait()