diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index d35f183..6a8ce61 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,263 +1,291 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import LastVisitStatus, OriginVisitStats from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" +DISABLE_ORIGIN_THRESHOLD = 3 +"""Threshold to disable failing origins""" + def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates At least one date must be not None. """ filtered_dates = [d for d in dates if d is not None] if not filtered_dates: raise ValueError("At least one date should be a valid datetime") return max(filtered_dates) def from_position_offset_to_days(position_offset: int) -> int: """Compute position offset to interval in days. - index 0 and 1: interval 1 day - index 2, 3 and 4: interval 2 days - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) Args: position_offset: The actual position offset for a given visit stats Returns: The offset as an interval in number of days """ assert position_offset >= 0 if position_offset < 2: result = 1 elif position_offset < 5: result = 2 else: result = 4 ** (position_offset - 4) return result def next_visit_queue_position( queue_position_per_visit_type: Dict, visit_stats: Dict ) -> datetime: """Compute the next visit queue position for the given visit_stats. This takes the visit_stats next_position_offset value and compute a corresponding interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling burst for hosters). Then computes out of this visit interval and the current visit stats's position in the queue a new position. As an implementation detail, if the visit stats does not have a queue position yet, this fallbacks to use the current global position (for the same visit type as the visit stats) to compute the new position in the queue. If there is no global state yet for the visit type, this starts up using the ``utcnow`` function as default value. Args: queue_position_per_visit_type: The global state of the queue per visit type visit_stats: The actual visit information to compute the next position for Returns: The actual next visit queue position for that visit stats """ days = from_position_offset_to_days(visit_stats["next_position_offset"]) random_fudge_factor = random.uniform(-0.1, 0.1) visit_interval = timedelta(days=days * (1 + random_fudge_factor)) # Use the current queue position per visit type as starting position if none is # already set default_queue_position = queue_position_per_visit_type.get( visit_stats["visit_type"], utcnow() ) current_position = ( visit_stats["next_visit_queue_position"] if visit_stats.get("next_visit_queue_position") else default_queue_position ) return current_position + visit_interval def get_last_status( incoming_visit_status: Dict, known_visit_stats: Dict ) -> Tuple[LastVisitStatus, Optional[bool]]: """Determine the `last_visit_status` and eventfulness of an origin according to the received visit_status object, and the state of the origin_visit_stats in db. Note that at the time this function is called, out of order messages were already discarded. Thus why the implementation is rather simple. Args: incoming_visit_status: Incoming visit status read ouf of the journal known_visit_stats: Visit stats already registered in the backend Returns: A tuple of (LastVisitStatus, Optional[bool]). LastVisitStatus represents the successfulness of the visit. Optional[bool] represents whether the snapshot is fresher than before (True/False) or None if there is no snapshot at all. """ status = incoming_visit_status["status"] if status in ("not_found", "failed"): return LastVisitStatus(status), None assert status in ("full", "partial") if incoming_visit_status["snapshot"] is None: return LastVisitStatus.failed, None if incoming_visit_status["snapshot"] != known_visit_stats.get("last_snapshot"): return LastVisitStatus.successful, True return LastVisitStatus.successful, False def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: - """Read messages from origin_visit_status journal topic to update "origin_visit_stats" + f"""Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... Details: - This journal consumes origin visit status information for final visit status (`"full"`, `"partial"`, `"failed"`, `"not_found"`). It drops the information of non final visit statuses (`"ongoing"`, `"created"`). - This journal client only considers messages that arrive in chronological order. Messages that arrive out of order (i.e. with a date field smaller than the latest recorded visit of the origin) are ignored. This is a tradeoff between correctness and simplicity of implementation [1]_. - The snapshot is used to determine the eventful or uneventful nature of the origin visit. - When no snapshot is provided, the visit is considered as failed. - Finally, the `next_visit_queue_position` (time at which some new objects are expected to be added for the origin), and `next_position_offset` (duration that we expect to wait between visits of this origin) are updated. + - When visits fails at least {DISABLE_ORIGIN_THRESHOLD} times in a row, the + origins are disabled in the scheduler table. It's up to the lister to activate + those back when they are listed again. + This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. .. [1] Ignoring out of order messages makes the initialization of the origin_visit_status table (from a full journal) less deterministic: only the `last_visit`, `last_visit_state` and `last_successful` fields are guaranteed to be exact, the `next_position_offset` field is a best effort estimate (which should converge once the client has run for a while on in-order messages). """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" interesting_messages = [ msg for msg in messages[msg_type] if "type" in msg and msg["status"] not in ("created", "ongoing") ] if not interesting_messages: return origin_visit_stats: Dict[Tuple[str, str], Dict] = { (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) for visit_stats in scheduler.origin_visit_stats_get( list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) ) } # Use the default values from the model object empty_object = { field.name: field.default if field.default != attr.NOTHING else None for field in attr.fields(OriginVisitStats) } + disabled_urls: List[str] = [] + # Retrieve the global queue state queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] pk = origin, visit_type if pk not in origin_visit_stats: origin_visit_stats[pk] = { **empty_object, "url": origin, "visit_type": visit_type, } visit_stats_d = origin_visit_stats[pk] if ( visit_stats_d.get("last_visit") and msg_dict["date"] <= visit_stats_d["last_visit"] ): # message received out of order, ignore continue # Compare incoming message to known status of the origin, to determine # eventfulness last_visit_status, eventful = get_last_status(msg_dict, visit_stats_d) # Update the position offset according to the visit status, # if we had already visited this origin before. if visit_stats_d.get("last_visit"): # Update the next position offset according to the existing value and the # eventfulness of the visit. increment = -2 if eventful else 1 visit_stats_d["next_position_offset"] = max( 0, visit_stats_d["next_position_offset"] + increment ) # increment the counter when last_visit_status is the same same_visit_status = last_visit_status == visit_stats_d["last_visit_status"] else: same_visit_status = False # Record current visit date as highest known date (we've rejected out of order # messages earlier). visit_stats_d["last_visit"] = msg_dict["date"] visit_stats_d["last_visit_status"] = last_visit_status # Record last successful visit date if last_visit_status == LastVisitStatus.successful: visit_stats_d["last_successful"] = max_date( msg_dict["date"], visit_stats_d.get("last_successful") ) visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Update the next visit queue position (which will be used solely for origin # without any last_update, cf. the dedicated scheduling policy # "origins_without_last_update") visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( queue_position_per_visit_type, visit_stats_d ) visit_stats_d["successive_visits"] = ( visit_stats_d["successive_visits"] + 1 if same_visit_status else 1 ) + # Disable recurring failing/not-found origins + if ( + visit_stats_d["last_visit_status"] + in [LastVisitStatus.not_found, LastVisitStatus.failed] + ) and visit_stats_d["successive_visits"] >= DISABLE_ORIGIN_THRESHOLD: + disabled_urls.append(visit_stats_d["url"]) + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) + + # Disable any origins if any + if disabled_urls: + disabled_origins = [] + for url in disabled_urls: + origins = scheduler.get_listed_origins(url=url).results + if len(origins) > 0: + origin = attr.evolve(origins[0], enabled=False) + disabled_origins.append(origin) + + if disabled_origins: + scheduler.record_listed_origins(disabled_origins) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 4f3a222..17b42f7 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,923 +1,996 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from datetime import timedelta import functools from itertools import permutations from typing import List from unittest.mock import Mock import attr import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import ( from_position_offset_to_days, max_date, next_visit_queue_position, process_journal_objects, ) from swh.scheduler.model import LastVisitStatus, ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) ONE_YEAR = datetime.timedelta(days=366) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "dates,expected_max_date", [ ((DATE1,), DATE1), ((None, DATE2), DATE2), ((DATE1, None), DATE1), ((DATE1, DATE2), DATE2), ((DATE2, DATE1), DATE2), ((DATE1, DATE2, DATE3), DATE3), ((None, DATE2, DATE3), DATE3), ((None, None, DATE3), DATE3), ((DATE1, None, DATE3), DATE3), ], ) def test_max_date(dates, expected_max_date): assert max_date(*dates) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date() with pytest.raises(ValueError, match="valid datetime"): max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # All messages have been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def test_journal_client_ignore_missing_type(swh_scheduler): """Ignore statuses with missing type key""" # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) date = utcnow() snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", "visit": 1, "status": "full", "date": date, "snapshot": snapshot, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # The message has been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def assert_visit_stats_ok( actual_visit_stats: OriginVisitStats, expected_visit_stats: OriginVisitStats, ignore_fields: List[str] = ["next_visit_queue_position"], ): """Utility test function to ensure visits stats read from the backend are in the right shape. The comparison on the next_visit_queue_position will be dealt with in dedicated tests so it's not tested in tests that are calling this function. """ fields = attr.fields_dict(OriginVisitStats) defaults = {field: fields[field].default for field in ignore_fields} actual_visit_stats = attr.evolve(actual_visit_stats, **defaults) assert actual_visit_stats == expected_visit_stats def test_journal_client_origin_visit_status_from_journal_last_not_found(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_visit=visit_status["date"], last_visit_status=LastVisitStatus.not_found, next_position_offset=4, successive_visits=1, ), ) visit_statuses = [ { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_visit=DATE3, last_visit_status=LastVisitStatus.not_found, next_position_offset=6, successive_visits=3, ), ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="bar", visit_type="git", last_visit=DATE3, last_visit_status=LastVisitStatus.failed, next_position_offset=6, successive_visits=3, ), ) def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 2, "status": "failed", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "bar", "visit": 3, "status": "failed", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="bar", visit_type="git", last_visit=DATE2, last_visit_status=LastVisitStatus.failed, next_position_offset=5, successive_visits=2, ), ) def test_journal_client_origin_visit_status_from_journal_last_successful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, { "origin": "foo", "visit": 2, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, { "origin": "foo", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_successful=DATE3, last_visit=DATE3, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), next_position_offset=0, successive_visits=3, ), ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_successful=DATE2, last_visit=DATE3, last_visit_status=LastVisitStatus.failed, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, successive_visits=1, ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_visit=DATE3 + ONE_DAY, last_successful=DATE3 + ONE_DAY, last_visit_status=LastVisitStatus.successful, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=5, successive_visits=1, ), ) VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( visit_stats, OriginVisitStats( url="foo", visit_type="git", last_successful=DATE1 + 3 * ONE_DAY, last_visit=DATE1 + 3 * ONE_DAY, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ), ignore_fields=[ "next_visit_queue_position", "next_position_offset", "successive_visits", ], ) # We ignore out of order messages, so the next_position_offset isn't exact # depending on the permutation. What matters is consistency of the final # dates (last_visit and last_successful). assert 4 <= visit_stats.next_position_offset <= 5 # same goes for successive_visits assert 1 <= visit_stats.successive_visits <= 2 VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_visit_stats = swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) visit_stats = actual_visit_stats[0] assert_visit_stats_ok( visit_stats, OriginVisitStats( url="cavabarder", visit_type="hg", last_successful=DATE1 + 3 * ONE_DAY, last_visit=DATE1 + 3 * ONE_DAY, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ), ignore_fields=[ "next_visit_queue_position", "next_position_offset", "successive_visits", ], ) # We ignore out of order messages, so the next_position_offset isn't exact # depending on the permutation. What matters is consistency of the final # dates (last_visit and last_successful). assert 2 <= visit_stats.next_position_offset <= 5 # same goes for successive_visits assert 1 <= visit_stats.successive_visits <= 4 VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [] assert swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get([(url, "hg")])[0] assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] assert ovs.last_successful == DATE1 + 5 * ONE_DAY assert ovs.last_visit == DATE1 + 5 * ONE_DAY assert ovs.last_visit_status == LastVisitStatus.successful assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" ) def test_journal_client_origin_visit_status_duplicated_messages(swh_scheduler): """A duplicated message must be ignored """ visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_successful=DATE1, last_visit=DATE1, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), successive_visits=1, ), ) def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): """An old message updates old information """ visit_status1 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } visit_status2 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status2]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_successful=DATE2, last_visit=DATE2, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), next_position_offset=4, successive_visits=1, ), ) VISIT_STATUSES_SAME_SNAPSHOT = [ {**ovs, "date": DATE1 + n * ONE_YEAR} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_SAME_SNAPSHOT, len(VISIT_STATUSES_SAME_SNAPSHOT)), ) def test_journal_client_origin_visit_statuses_same_snapshot_permutation( visit_statuses, swh_scheduler ): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [("cavabarder", "hg")] ) visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( visit_stats, OriginVisitStats( url="cavabarder", visit_type="hg", last_successful=DATE1 + 2 * ONE_YEAR, last_visit=DATE1 + 2 * ONE_YEAR, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ), ignore_fields=[ "next_visit_queue_position", "next_position_offset", "successive_visits", ], ) # We ignore out of order messages, so the next_position_offset isn't exact # depending on the permutation. What matters is consistency of the final # dates (last_visit and last_successful). assert 4 <= visit_stats.next_position_offset <= 6 # same goes for successive_visits assert 1 <= visit_stats.successive_visits <= 3 @pytest.mark.parametrize( "position_offset, interval", [ (0, 1), (1, 1), (2, 2), (3, 2), (4, 2), (5, 4), (6, 16), (7, 64), (8, 256), (9, 1024), (10, 4096), ], ) def test_journal_client_from_position_offset_to_days(position_offset, interval): assert from_position_offset_to_days(position_offset) == interval def test_journal_client_from_position_offset_to_days_only_positive_input(): with pytest.raises(AssertionError): from_position_offset_to_days(-1) @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),] ) def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() mock_now = mocker.patch("swh.scheduler.journal_client.utcnow") mock_now.return_value = date_now actual_position = next_visit_queue_position( {}, {"next_position_offset": next_position_offset, "visit_type": "svn",} ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_now.called assert mock_random.called @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),] ) def test_next_visit_queue_position_with_state( mocker, fudge_factor, next_position_offset ): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() actual_position = next_visit_queue_position( {"git": date_now}, {"next_position_offset": next_position_offset, "visit_type": "git",}, ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_random.called @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),] ) def test_next_visit_queue_position_with_next_visit_queue( mocker, fudge_factor, next_position_offset ): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() actual_position = next_visit_queue_position( {}, { "next_position_offset": next_position_offset, "visit_type": "hg", "next_visit_queue_position": date_now, }, ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_random.called + + +def test_disable_failing_origins(swh_scheduler): + """Origin with too many failed attempts ends up being deactivated in the scheduler. + + """ + + # actually store the origin in the scheduler so we can check it's deactivated in the + # end. + lister = swh_scheduler.get_or_create_lister( + name="something", instance_name="something" + ) + origin = ListedOrigin( + url="bar", enabled=True, visit_type="svn", lister_id=lister.id + ) + swh_scheduler.record_listed_origins([origin]) + + visit_statuses = [ + { + "origin": "bar", + "visit": 2, + "status": "failed", + "date": DATE1, + "type": "svn", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "failed", + "date": DATE2, + "type": "svn", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "failed", + "date": DATE3, + "type": "svn", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "svn")]) + assert_visit_stats_ok( + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="svn", + last_successful=None, + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, + next_position_offset=6, + successive_visits=3, + ), + ) + + # Now check that the origin in question is disabled + actual_page = swh_scheduler.get_listed_origins(url="bar") + + assert len(actual_page.results) == 1 + assert actual_page.next_page_token is None + + for origin in actual_page.results: + assert origin.enabled is False + assert origin.lister_id == lister.id + assert origin.url == "bar" + assert origin.visit_type == "svn"