diff --git a/sql/updates/19.sql b/sql/updates/19.sql --- a/sql/updates/19.sql +++ b/sql/updates/19.sql @@ -7,6 +7,8 @@ last_eventful timestamptz, last_uneventful timestamptz, last_failed timestamptz, + last_notfound timestamptz, + last_snapshot bytea, primary key (url, visit_type) ); @@ -16,3 +18,5 @@ comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; +comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; +comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -770,23 +770,43 @@ visit_type, last_eventful, last_uneventful, - last_failed + last_failed, + last_notfound, + last_snapshot ) - VALUES (%s, %s, %s, %s, %s) ON CONFLICT (url, visit_type) DO + VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url, visit_type) DO UPDATE - SET last_eventful = coalesce( - excluded.last_eventful, - ovi.last_eventful + SET last_eventful = ( + select max(eventful.date) from (values + (excluded.last_eventful), + (ovi.last_eventful) + ) as eventful(date) ), - last_uneventful = coalesce( - excluded.last_uneventful, - ovi.last_uneventful + last_uneventful = ( + select max(uneventful.date) from (values + (excluded.last_uneventful), + (ovi.last_uneventful) + ) as uneventful(date) ), - last_failed = coalesce( - excluded.last_failed, - ovi.last_failed + last_failed = ( + select max(failed.date) from (values + (excluded.last_failed), + (ovi.last_failed) + ) as failed(date) + ), + last_notfound = ( + select max(notfound.date) from (values + (excluded.last_notfound), + (ovi.last_notfound) + ) as notfound(date) + ), + last_snapshot = (select + case + when ovi.last_eventful < excluded.last_eventful then excluded.last_snapshot + else ovi.last_snapshot + end ) - """ + """ # noqa cur.execute( query, @@ -796,6 +816,8 @@ visit_stats.last_eventful, visit_stats.last_uneventful, visit_stats.last_failed, + visit_stats.last_notfound, + visit_stats.last_snapshot, ), ) diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/journal_client.py @@ -0,0 +1,89 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime +from typing import Dict, List, Optional + +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import OriginVisitStats + +msg_type = "origin_visit_status" + + +def max_date(d1: Optional[datetime], d2: Optional[datetime]) -> datetime: + """Return the max date of the visit stats + + """ + if d1 is None and d2 is None: + raise ValueError("At least one date should be a valid datetime") + if d1 is None: + assert d2 is not None # make mypy happy + return d2 + if d2 is None: + return d1 + return max(d1, d2) + + +def process_journal_objects( + messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface +) -> None: + """Read messages from origin_visit_status journal topics, then inserts them in the + scheduler "origin_visit_stats" table. + + Worker function for `JournalClient.process(worker_fn)`, after + currification of `scheduler` and `task_names`. + + """ + assert set(messages) <= { + msg_type + }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" + assert msg_type in messages, f"Expected {msg_type} messages" + + for msg_dict in messages[msg_type]: + if msg_dict["status"] in ("created", "ongoing"): + continue + origin = msg_dict["origin"] + visit_type = msg_dict["type"] + visit_stats_d = { + "url": origin, + "visit_type": visit_type, + "last_uneventful": None, + "last_eventful": None, + "last_failed": None, + "last_notfound": None, + "last_snapshot": None, + } + actual_visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) + + if msg_dict["status"] == "not_found": + visit_stats_d["last_notfound"] = max_date( + msg_dict["date"], + actual_visit_stats.last_notfound if actual_visit_stats else None, + ) + elif msg_dict["snapshot"] is None: + visit_stats_d["last_failed"] = max_date( + msg_dict["date"], + actual_visit_stats.last_failed if actual_visit_stats else None, + ) + else: # visit with snapshot, something happened + if not actual_visit_stats: + visit_stats_d["last_eventful"] = msg_dict["date"] + visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + else: + date = max_date( + actual_visit_stats.last_eventful, actual_visit_stats.last_uneventful + ) + if date and msg_dict["date"] < date: + # ignore out of order message + continue + previous_snapshot = actual_visit_stats.last_snapshot + if msg_dict["snapshot"] != previous_snapshot: + visit_stats_d["last_eventful"] = msg_dict["date"] + visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + else: + visit_stats_d["last_uneventful"] = msg_dict["date"] + + visit_stats = OriginVisitStats(**visit_stats_d) + scheduler.origin_visit_stats_upsert(visit_stats) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -230,6 +230,12 @@ type=Optional[datetime.datetime], validator=type_validator() ) last_failed = attr.ib(type=Optional[datetime.datetime], validator=type_validator()) + last_notfound = attr.ib( + type=Optional[datetime.datetime], validator=type_validator() + ) + last_snapshot = attr.ib( + type=Optional[bytes], validator=type_validator(), default=None + ) @last_eventful.validator def check_last_eventful(self, attribute, value): @@ -242,3 +248,7 @@ @last_failed.validator def check_last_failed(self, attribute, value): check_timestamptz(value) + + @last_notfound.validator + def check_last_notfound(self, attribute, value): + check_timestamptz(value) diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (20, now(), 'Work In Progress'); + values (22, now(), 'Work In Progress'); create table task_type ( type text primary key, @@ -171,6 +171,8 @@ last_eventful timestamptz, last_uneventful timestamptz, last_failed timestamptz, + last_notfound timestamptz, + last_snapshot bytea, primary key (url, visit_type) ); @@ -180,3 +182,5 @@ comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; +comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; +comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_journal_client.py @@ -0,0 +1,354 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import functools + +import pytest + +from swh.model.hashutil import hash_to_bytes +from swh.scheduler.journal_client import max_date, process_journal_objects +from swh.scheduler.model import OriginVisitStats +from swh.scheduler.utils import utcnow + + +def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): + process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) + + with pytest.raises(AssertionError, match="Got unexpected origin_visit"): + process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) + + with pytest.raises(AssertionError, match="Expected origin_visit_status"): + process_fn({}) + + +ONE_DAY = datetime.timedelta(days=1) + +DATE3 = utcnow() +DATE2 = DATE3 - ONE_DAY +DATE1 = DATE2 - ONE_DAY + + +assert DATE1 < DATE2 < DATE3 + + +@pytest.mark.parametrize( + "d1,d2,expected_max_date", + [ + (None, DATE2, DATE2), + (DATE1, None, DATE1), + (DATE1, DATE2, DATE2), + (DATE2, DATE1, DATE2), + ], +) +def test_max_date(d1, d2, expected_max_date): + assert max_date(d1, d2) == expected_max_date + + +def test_max_date_raise(): + with pytest.raises(ValueError, match="valid datetime"): + max_date(None, None) + + +def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): + """Only final statuses (full, partial) are important, the rest remain ignored. + + """ + visit_statuses = [ + { + "origin": "foo", + "visit": 1, + "status": "created", + "date": utcnow(), + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 1, + "status": "ongoing", + "date": utcnow(), + "type": "svn", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + # Ensure those visit status are ignored + for visit_status in visit_statuses: + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is None + + +def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): + visit_status = { + "origin": "foo", + "visit": 1, + "status": "not_found", + "date": DATE1, + "type": "git", + "snapshot": None, + } + + process_journal_objects( + {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=visit_status["date"], + last_snapshot=None, + ) + + visit_statuses = [ + { + "origin": "foo", + "visit": 4, + "status": "not_found", + "date": DATE3, + "type": "git", + "snapshot": None, + }, + { + "origin": "foo", + "visit": 3, + "status": "not_found", + "date": DATE2, + "type": "git", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + for visit_status in visit_statuses: + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=DATE3, + last_snapshot=None, + ) + + +def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): + visit_statuses = [ + { + "origin": "foo", + "visit": 1, + "status": "partial", + "date": utcnow(), + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 2, + "status": "full", + "date": DATE1, + "type": "git", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + # Ensure those visit status are ignored + for visit_status in visit_statuses: + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=None, + last_uneventful=None, + last_failed=visit_status["date"], + last_notfound=None, + last_snapshot=None, + ) + + visit_statuses = [ + { + "origin": "bar", + "visit": 3, + "status": "full", + "date": DATE3, + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "full", + "date": DATE2, + "type": "git", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + for visit_status in visit_statuses: + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=None, + last_uneventful=None, + last_failed=DATE3, + last_notfound=None, + last_snapshot=None, + ) + + +def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): + visit_statuses = [ + { + "origin": "bar", + "visit": 1, + "status": "partial", + "date": utcnow(), + "type": "git", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "foo", + "visit": 2, + "status": "full", + "date": DATE1, + "type": "git", + "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + for visit_status in visit_statuses: + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=visit_status["date"], + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=visit_status["snapshot"], + ) + + most_recent_date = DATE3 + most_recent_snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") + visit_statuses = [ + { + "origin": "foo", + "visit": 3, + "status": "full", + "date": most_recent_date, + "type": "git", + "snapshot": most_recent_snapshot, + }, + { + "origin": "foo", + "visit": 3, + "status": "partial", + "date": DATE2, + "type": "git", + "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + for visit_status in visit_statuses: + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=most_recent_date, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=most_recent_snapshot, + ) + + +def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): + visit_status = { + "origin": "foo", + "visit": 1, + "status": "full", + "date": DATE3 + ONE_DAY, + "type": "git", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + } + + # Let's insert some visit stats with some previous visit information + swh_scheduler.origin_visit_stats_upsert( + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=DATE3, + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + ) + ) + + process_journal_objects( + {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + assert actual_origin_visit_stats is not None + assert actual_origin_visit_stats == OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=visit_status["date"], # most recent date but uneventful + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + ) diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -14,6 +14,7 @@ import attr import pytest +from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import StaleData, UnknownPolicy from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin, ListedOriginPageToken, OriginVisitStats @@ -782,6 +783,7 @@ last_eventful=eventful_date, last_uneventful=None, last_failed=None, + last_notfound=None, ) swh_scheduler.origin_visit_stats_upsert(visit_stats) swh_scheduler.origin_visit_stats_upsert(visit_stats) @@ -796,6 +798,7 @@ last_eventful=None, last_uneventful=uneventful_date, last_failed=None, + last_notfound=None, ) swh_scheduler.origin_visit_stats_upsert(visit_stats) @@ -807,6 +810,7 @@ last_eventful=eventful_date, last_uneventful=uneventful_date, last_failed=None, + last_notfound=None, ) assert uneventful_visit == expected_visit_stats @@ -818,6 +822,7 @@ last_eventful=None, last_uneventful=None, last_failed=failed_date, + last_notfound=None, ) swh_scheduler.origin_visit_stats_upsert(visit_stats) @@ -829,6 +834,85 @@ last_eventful=eventful_date, last_uneventful=uneventful_date, last_failed=failed_date, + last_notfound=None, ) assert failed_visit == expected_visit_stats + + def test_origin_visit_stats_upsert_with_snapshot(self, swh_scheduler) -> None: + eventful_date = utcnow() + url = "https://github.com/666/test" + + visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=eventful_date, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ) + swh_scheduler.origin_visit_stats_upsert(visit_stats) + + assert swh_scheduler.origin_visit_stats_get(url, "git") == visit_stats + assert swh_scheduler.origin_visit_stats_get(url, "svn") is None + + def test_origin_visit_stats_upsert_messing_with_time(self, swh_scheduler) -> None: + url = "interesting-origin" + + # Let's play with dates... + date2 = utcnow() + date1 = date2 - ONEDAY + date0 = date1 - ONEDAY + assert date0 < date1 < date2 + + snapshot2 = hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd") + snapshot0 = hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff") + visit_stats0 = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=date2, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=snapshot2, + ) + swh_scheduler.origin_visit_stats_upsert(visit_stats0) + + actual_visit_stats0 = swh_scheduler.origin_visit_stats_get(url, "git") + assert actual_visit_stats0 == visit_stats0 + + visit_stats2 = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=None, + last_uneventful=date1, + last_notfound=None, + last_failed=None, + ) + swh_scheduler.origin_visit_stats_upsert(visit_stats2) + + actual_visit_stats2 = swh_scheduler.origin_visit_stats_get(url, "git") + assert actual_visit_stats2 == attr.evolve( + actual_visit_stats0, last_uneventful=date1 + ) + + # a past date, what happens? + # date0 < date2 so this ovs should be dismissed + # the "eventful" associated snapshot should be dismissed as well + visit_stats1 = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=date0, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=snapshot0, + ) + swh_scheduler.origin_visit_stats_upsert(visit_stats1) + + actual_visit_stats1 = swh_scheduler.origin_visit_stats_get(url, "git") + + assert actual_visit_stats1 == attr.evolve( + actual_visit_stats2, last_eventful=date2 + )