diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py index ae00645..9959287 100644 --- a/swh/scheduler/tests/test_cli_journal.py +++ b/swh/scheduler/tests/test_cli_journal.py @@ -1,114 +1,114 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Dict, List from click.testing import CliRunner, Result from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import value_to_kafka from swh.scheduler import get_scheduler from swh.scheduler.cli import cli -from swh.scheduler.tests.test_journal_client import VISIT_STATUSES1 +from swh.scheduler.tests.test_journal_client import VISIT_STATUSES_1 @pytest.fixture def swh_scheduler_cfg(postgresql_scheduler, kafka_server): """Journal client configuration ready""" return { "scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,}, "journal": { "brokers": [kafka_server], "group_id": "test-consume-visit-status", }, } def _write_configuration_path(config: Dict, tmp_path: str) -> str: config_path = os.path.join(str(tmp_path), "scheduler.yml") with open(config_path, "w") as f: f.write(yaml.dump(config)) return config_path @pytest.fixture def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path): """Write scheduler configuration in temporary path and returns such path""" return _write_configuration_path(swh_scheduler_cfg, tmp_path) def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: """Invoke swh scheduler journal subcommands """ runner = CliRunner() result = runner.invoke(cli, ["-C" + config_path] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler( swh_scheduler_cfg, tmp_path ): config = swh_scheduler_cfg.copy() config["scheduler"] = {"cls": "foo"} config_path = _write_configuration_path(config, tmp_path) with pytest.raises(ValueError, match="must be instantiated"): invoke( ["journal-client", "--stop-after-objects", "1",], config_path, ) def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf( swh_scheduler_cfg, tmp_path ): config = swh_scheduler_cfg.copy() config.pop("journal", None) config_path = _write_configuration_path(config, tmp_path) with pytest.raises(ValueError, match="Missing 'journal'"): invoke( ["journal-client", "--stop-after-objects", "1",], config_path, ) def test_cli_journal_client_origin_visit_status( swh_scheduler_cfg, swh_scheduler_cfg_path, ): kafka_server = swh_scheduler_cfg["journal"]["brokers"][0] swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"]) producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test visit-stats producer", "acks": "all", } ) - visit_status = VISIT_STATUSES1[0] + visit_status = VISIT_STATUSES_1[0] value = value_to_kafka(visit_status) topic = "swh.journal.objects.origin_visit_status" producer.produce(topic=topic, key=b"bogus-origin", value=value) producer.flush() result = invoke( ["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path, ) # Check the output expected_output = "Processed 1 message(s).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output actual_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_visit_stats is not None diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 3fa91be..ffc3923 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,596 +1,540 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools from itertools import permutations import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import max_date, process_journal_objects from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "dates,expected_max_date", [ ((DATE1,), DATE1), ((None, DATE2), DATE2), ((DATE1, None), DATE1), ((DATE1, DATE2), DATE2), ((DATE2, DATE1), DATE2), ((DATE1, DATE2, DATE3), DATE3), ((None, DATE2, DATE3), DATE3), ((None, None, DATE3), DATE3), ((DATE1, None, DATE3), DATE3), ], ) def test_max_date(dates, expected_max_date): assert max_date(*dates) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date() with pytest.raises(ValueError, match="valid datetime"): max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # Ensure those visit status are ignored for visit_status in visit_statuses: actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is None def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) - actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( - visit_status["origin"], visit_status["type"] - ) + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get("foo", "git") assert actual_origin_visit_stats == OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], + url="foo", + visit_type="git", last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=visit_status["date"], last_snapshot=None, ) visit_statuses = [ { "origin": "foo", - "visit": 4, + "visit": 3, "status": "not_found", - "date": DATE3, + "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "foo", - "visit": 3, + "visit": 4, "status": "not_found", - "date": DATE2, + "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - for visit_status in visit_statuses: - actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( - visit_status["origin"], visit_status["type"] - ) - assert actual_origin_visit_stats is not None - assert actual_origin_visit_stats == OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=DATE3, - last_snapshot=None, - ) + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get("foo", "git") + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=DATE3, + last_snapshot=None, + ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", - "visit": 2, + "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, - ] - - process_journal_objects( - {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler - ) - - # Ensure those visit status are ignored - for visit_status in visit_statuses: - actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( - visit_status["origin"], visit_status["type"] - ) - assert actual_origin_visit_stats is not None - assert actual_origin_visit_stats == OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=None, - last_uneventful=None, - last_failed=visit_status["date"], - last_notfound=None, - last_snapshot=None, - ) - - visit_statuses = [ { "origin": "bar", - "visit": 3, + "visit": 2, "status": "full", - "date": DATE3, + "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "bar", - "visit": 4, + "visit": 3, "status": "full", - "date": DATE2, + "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - for visit_status in visit_statuses: - actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( - visit_status["origin"], visit_status["type"] - ) - assert actual_origin_visit_stats is not None - assert actual_origin_visit_stats == OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=None, - last_uneventful=None, - last_failed=DATE3, - last_notfound=None, - last_snapshot=None, - ) + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get("bar", "git") + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=DATE3, + last_notfound=None, + last_snapshot=None, + ) def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", - "visit": 2, + "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, - ] - - process_journal_objects( - {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler - ) - - for visit_status in visit_statuses: - actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( - visit_status["origin"], visit_status["type"] - ) - assert actual_origin_visit_stats is not None - assert actual_origin_visit_stats == OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=visit_status["date"], - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=visit_status["snapshot"], - ) - - visit_statuses = [ { "origin": "foo", - "visit": 4, - "status": "full", - "date": DATE3, + "visit": 2, + "status": "partial", + "date": DATE2, "type": "git", - "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, { "origin": "foo", "visit": 3, - "status": "partial", - "date": DATE2, + "status": "full", + "date": DATE3, "type": "git", - "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), + "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - for visit_status in visit_statuses: - actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( - visit_status["origin"], visit_status["type"] - ) - assert actual_origin_visit_stats is not None - assert actual_origin_visit_stats == OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=DATE3, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), - ) + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get("foo", "git") + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE3, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=DATE3, last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( visit_status["origin"], visit_status["type"] ) assert actual_origin_visit_stats is not None assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=visit_status["date"], # most recent date but uneventful last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) expected_visit_stats = OriginVisitStats( url="foo", visit_type="git", last_eventful=DATE1 + ONE_DAY, last_uneventful=DATE1 + 3 * ONE_DAY, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) assert swh_scheduler.origin_visit_stats_get("foo", "git") == expected_visit_stats -VISIT_STATUSES1 = [ +VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( - "visit_statuses", permutations(VISIT_STATUSES1, len(VISIT_STATUSES1)) + "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) expected_visit_stats = OriginVisitStats( url="cavabarder", visit_type="hg", last_eventful=DATE1 + 2 * ONE_DAY, last_uneventful=DATE1 + 3 * ONE_DAY, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ) assert ( swh_scheduler.origin_visit_stats_get("cavabarder", "hg") == expected_visit_stats ) VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get("cavabarder", "hg") is None assert swh_scheduler.origin_visit_stats_get("cavabarder", "git") is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get(url, "git") assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get(url, "hg") assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get("cavabarder", "git") assert ovs.last_eventful == DATE1 + 5 * ONE_DAY assert ovs.last_uneventful is None assert ovs.last_failed is None assert ovs.last_notfound is None assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" )