diff --git a/swh/search/cli.py b/swh/search/cli.py --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -74,6 +74,7 @@ import functools from swh.journal.client import get_journal_client + from swh.storage import get_storage from . import get_search from .journal_client import process_journal_objects @@ -95,8 +96,11 @@ client = get_journal_client(cls="kafka", **journal_cfg,) search = get_search(**config["search"]) + storage = get_storage(**config.get("storage", {})) - worker_fn = functools.partial(process_journal_objects, search=search,) + worker_fn = functools.partial( + process_journal_objects, search=search, storage=storage + ) nb_messages = 0 try: nb_messages = client.process(worker_fn) diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -41,6 +41,8 @@ "snapshot_id", "last_visit_date", "last_eventful_visit_date", + "last_revision_date", + "last_release_date", ): if field_name in origin: res[field_name] = origin.pop(field_name) @@ -160,6 +162,8 @@ "snapshot_id": {"type": "keyword"}, "last_visit_date": {"type": "date"}, "last_eventful_visit_date": {"type": "date"}, + "last_release_date": {"type": "date"}, + "last_revision_date": {"type": "date"}, "intrinsic_metadata": { "type": "nested", "properties": { @@ -196,6 +200,8 @@ ZonedDateTime last_visit_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_visit_date", "0001-01-01T00:00:00Z")); String snapshot_id = ctx._source.getOrDefault("snapshot_id", ""); ZonedDateTime last_eventful_visit_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_eventful_visit_date", "0001-01-01T00:00:00Z")); + ZonedDateTime last_revision_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_revision_date", "0001-01-01T00:00:00Z")); + ZonedDateTime last_release_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_release_date", "0001-01-01T00:00:00Z")); // update origin document with new field values ctx._source.putAll(params); @@ -237,6 +243,24 @@ ctx._source.last_eventful_visit_date = last_eventful_visit_date; } } + + // Undo overwrite if incoming last_revision_date is older + if (ctx._source.containsKey("last_revision_date")) { + ZonedDateTime incoming_last_revision_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_revision_date", "0001-01-01T00:00:00Z")); + int difference = incoming_last_revision_date.compareTo(last_revision_date); // returns -1, 0 or 1 + if(difference < 0){ + ctx._source.last_revision_date = last_revision_date; + } + } + + // Undo overwrite if incoming last_release_date is older + if (ctx._source.containsKey("last_release_date")) { + ZonedDateTime incoming_last_release_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_release_date", "0001-01-01T00:00:00Z")); + int difference = incoming_last_release_date.compareTo(last_release_date); // returns -1, 0 or 1 + if(difference < 0){ + ctx._source.last_release_date = last_release_date; + } + } """ # noqa ) @@ -282,6 +306,8 @@ min_nb_visits: int = 0, min_last_visit_date: str = "", min_last_eventful_visit_date: str = "", + min_last_revision_date: str = "", + min_last_release_date: str = "", page_token: Optional[str] = None, limit: int = 50, ) -> PagedResult[MinimalOriginDict]: @@ -357,6 +383,26 @@ } } ) + if min_last_revision_date: + query_clauses.append( + { + "range": { + "last_revision_date": { + "gte": min_last_revision_date.replace("Z", "+00:00"), + } + } + } + ) + if min_last_release_date: + query_clauses.append( + { + "range": { + "last_release_date": { + "gte": min_last_release_date.replace("Z", "+00:00"), + } + } + } + ) if visit_types is not None: query_clauses.append({"terms": {"visit_types": visit_types}}) diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -100,6 +100,24 @@ document["snapshot_id"] = current_snapshot_id document["last_eventful_visit_date"] = current_date.isoformat() + if "last_revision_date" in document: + document["last_revision_date"] = max( + datetime.fromisoformat(document["last_revision_date"]), + datetime.fromisoformat( + self._origins[id_] + .get("last_revision_date", "0001-01-01T00:00:00Z",) + .replace("Z", "+00:00") + ), + ).isoformat() + if "last_release_date" in document: + document["last_release_date"] = max( + datetime.fromisoformat(document["last_release_date"]), + datetime.fromisoformat( + self._origins[id_] + .get("last_release_date", "0001-01-01T00:00:00Z",) + .replace("Z", "+00:00") + ), + ).isoformat() self._origins[id_].update(document) if id_ not in self._origin_ids: @@ -116,6 +134,8 @@ min_nb_visits: int = 0, min_last_visit_date: str = "", min_last_eventful_visit_date: str = "", + min_last_revision_date: str = "", + min_last_release_date: str = "", limit: int = 50, ) -> PagedResult[MinimalOriginDict]: hits: Iterator[Dict[str, Any]] = ( @@ -191,6 +211,27 @@ hits, ) + if min_last_revision_date: + hits = filter( + lambda o: datetime.fromisoformat( + o.get("last_revision_date", "0001-01-01T00:00:00Z").replace( + "Z", "+00:00" + ) + ) + >= datetime.fromisoformat(min_last_revision_date), + hits, + ) + if min_last_release_date: + hits = filter( + lambda o: datetime.fromisoformat( + o.get("last_release_date", "0001-01-01T00:00:00Z").replace( + "Z", "+00:00" + ) + ) + >= datetime.fromisoformat(min_last_release_date), + hits, + ) + if visit_types is not None: visit_types_set = set(visit_types) hits = filter( diff --git a/swh/search/interface.py b/swh/search/interface.py --- a/swh/search/interface.py +++ b/swh/search/interface.py @@ -62,6 +62,8 @@ min_nb_visits: int = 0, min_last_visit_date: str = "", min_last_eventful_visit_date: str = "", + min_last_revision_date: str = "", + min_last_release_date: str = "", limit: int = 50, ) -> PagedResult[MinimalOriginDict]: """Searches for origins matching the `url_pattern`. @@ -80,6 +82,10 @@ min_last_eventful_visit_date: Filter origins that have last_eventful_visit_date (eventful = snapshot_id changed) on or after the provided date(ISO format) + min_last_revision_date: Filter origins that have + last_revision_date on or after the provided date(ISO format) + min_last_release_date: Filter origins that have + last_release_date on or after the provided date(ISO format) limit: number of results to return Returns: diff --git a/swh/search/journal_client.py b/swh/search/journal_client.py --- a/swh/search/journal_client.py +++ b/swh/search/journal_client.py @@ -5,6 +5,8 @@ import logging +from swh.model.model import TargetType + EXPECTED_MESSAGE_TYPES = { "origin", "origin_visit", @@ -13,7 +15,7 @@ } -def process_journal_objects(messages, *, search): +def process_journal_objects(messages, *, search, storage=None): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) <= EXPECTED_MESSAGE_TYPES, set(messages) @@ -25,7 +27,7 @@ process_origin_visits(messages["origin_visit"], search) if "origin_visit_status" in messages: - process_origin_visit_statuses(messages["origin_visit_status"], search) + process_origin_visit_statuses(messages["origin_visit_status"], search, storage) if "origin_intrinsic_metadata" in messages: process_origin_intrinsic_metadata(messages["origin_intrinsic_metadata"], search) @@ -55,17 +57,50 @@ ) -def process_origin_visit_statuses(visit_statuses, search): +def process_origin_visit_statuses(visit_statuses, search, storage): logging.debug("processing origin visit statuses %r", visit_statuses) + from swh.storage.algos.snapshot import snapshot_get_all_branches + + def fetch_last_revision_release_date(snapshot_id): + if not snapshot_id: + return {} + + branches = snapshot_get_all_branches(storage, snapshot_id).branches.values() + + tip_revision_ids = [] + tip_release_ids = [] + + for branch in branches: + if branch.target_type == TargetType.REVISION: + tip_revision_ids.append(branch.target) + elif branch.target_type == TargetType.RELEASE: + tip_release_ids.append(branch.target) + + revision_datetimes = [ + revision.date.to_datetime() + for revision in storage.revision_get(tip_revision_ids) + ] + + release_datetimes = [ + release.date.to_datetime() + for release in storage.release_get(tip_release_ids) + ] + + return { + "last_revision_date": max(revision_datetimes).isoformat(), + "last_release_date": max(release_datetimes).isoformat(), + } + full_visit_status = [ { - "url": (visit_status["origin"]), + "url": visit_status["origin"], "has_visits": True, "nb_visits": visit_status["visit"], "snapshot_id": visit_status.get("snapshot"), "last_visit_date": visit_status["date"].isoformat(), "last_eventful_visit_date": visit_status["date"].isoformat(), + **fetch_last_revision_release_date(visit_status.get("snapshot")), } for visit_status in visit_statuses if visit_status["status"] == "full" diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -27,6 +27,8 @@ index: test read_alias: test-read write_alias: test-write +storage: + cls: memory """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ diff --git a/swh/search/tests/test_journal_client.py b/swh/search/tests/test_journal_client.py --- a/swh/search/tests/test_journal_client.py +++ b/swh/search/tests/test_journal_client.py @@ -7,7 +7,21 @@ import functools from unittest.mock import MagicMock +from swh.model.model import ( + ObjectType, + Person, + Release, + Revision, + RevisionType, + Snapshot, + SnapshotBranch, + TargetType, + Timestamp, + TimestampWithTimezone, + hash_to_bytes, +) from swh.search.journal_client import process_journal_objects +from swh.storage import get_storage def test_journal_client_origin_from_journal(): @@ -42,7 +56,122 @@ def test_journal_client_origin_visit_status_from_journal(): search_mock = MagicMock() - worker_fn = functools.partial(process_journal_objects, search=search_mock,) + storage = get_storage("memory") + + DATES = [ + TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567891, microseconds=0,), + offset=120, + negative_utc=False, + ), + TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567892, microseconds=0,), + offset=120, + negative_utc=False, + ), + ] + + COMMITTERS = [ + Person(fullname=b"foo", name=b"foo", email=b""), + Person(fullname=b"bar", name=b"bar", email=b""), + ] + + REVISIONS = [ + Revision( + id=hash_to_bytes("66c7c1cd9673275037140f2abff7b7b11fc9439c"), + message=b"hello", + date=DATES[0], + committer=COMMITTERS[0], + author=COMMITTERS[0], + committer_date=DATES[0], + type=RevisionType.GIT, + directory=b"\x01" * 20, + synthetic=False, + metadata=None, + parents=( + hash_to_bytes("9b918dd063cec85c2bc63cc7f167e29f5894dcbc"), + hash_to_bytes("757f38bdcd8473aaa12df55357f5e2f1a318e672"), + ), + ), + Revision( + id=hash_to_bytes("c7f96242d73c267adc77c2908e64e0c1cb6a4431"), + message=b"hello again", + date=DATES[1], + committer=COMMITTERS[1], + author=COMMITTERS[1], + committer_date=DATES[1], + type=RevisionType.MERCURIAL, + directory=b"\x02" * 20, + synthetic=False, + metadata=None, + parents=(), + extra_headers=((b"foo", b"bar"),), + ), + ] + + RELEASES = [ + Release( + id=hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"), + name=b"v0.0.1", + date=TimestampWithTimezone( + timestamp=Timestamp(seconds=1234567890, microseconds=0,), + offset=120, + negative_utc=False, + ), + author=COMMITTERS[0], + target_type=ObjectType.REVISION, + target=b"\x04" * 20, + message=b"foo", + synthetic=False, + ), + Release( + id=hash_to_bytes("ee4d20e80af850cc0f417d25dc5073792c5010d2"), + name=b"this-is-a/tag/1.0", + date=None, + author=None, + target_type=ObjectType.DIRECTORY, + target=b"\x05" * 20, + message=b"bar", + synthetic=False, + ), + ] + + SNAPSHOTS = [ + Snapshot( + id=hash_to_bytes("0e7f84ede9a254f2cd55649ad5240783f557e65f"), + branches={ + b"target/revision1": SnapshotBranch( + target_type=TargetType.REVISION, target=REVISIONS[0].id, + ), + b"target/revision2": SnapshotBranch( + target_type=TargetType.REVISION, target=REVISIONS[0].id, + ), + b"target/revision3": SnapshotBranch( + target_type=TargetType.REVISION, target=REVISIONS[0].id, + ), + b"target/release1": SnapshotBranch( + target_type=TargetType.RELEASE, target=RELEASES[0].id + ), + b"target/release2": SnapshotBranch( + target_type=TargetType.RELEASE, target=RELEASES[0].id + ), + b"target/release3": SnapshotBranch( + target_type=TargetType.RELEASE, target=RELEASES[0].id + ), + b"target/alias": SnapshotBranch( + target_type=TargetType.ALIAS, target=b"target/revision" + ), + }, + ), + ] + + storage.revision_add(REVISIONS) + storage.release_add(RELEASES) + storage.snapshot_add(SNAPSHOTS) + + worker_fn = functools.partial( + process_journal_objects, search=search_mock, storage=storage + ) current_datetime = datetime.now(tz=timezone.utc) worker_fn( @@ -53,7 +182,7 @@ "status": "full", "visit": 5, "date": current_datetime, - "snapshot": None, + "snapshot": SNAPSHOTS[0].id, } # full visits ok ] } @@ -64,9 +193,11 @@ "url": "http://foobar.baz", "has_visits": True, "nb_visits": 5, - "snapshot_id": None, + "snapshot_id": SNAPSHOTS[0].id, "last_visit_date": current_datetime.isoformat(), "last_eventful_visit_date": current_datetime.isoformat(), + "last_revision_date": "2009-02-14T01:31:31+02:00", + "last_release_date": "2009-02-14T01:31:30+02:00", }, ] ) diff --git a/swh/search/tests/test_search.py b/swh/search/tests/test_search.py --- a/swh/search/tests/test_search.py +++ b/swh/search/tests/test_search.py @@ -361,6 +361,86 @@ _check_min_last_eventful_visit_date(now_plus_5_hours) # Works for = _check_min_last_eventful_visit_date(now) # Works for < + def test_origin_last_revision_date_update_search(self): + origin_url = "http://foobar.baz" + self.search.origin_update([{"url": origin_url}]) + self.search.flush() + + def _update_last_revision_date(last_revision_date): + self.search.origin_update( + [{"url": origin_url, "last_revision_date": last_revision_date,}] + ) + self.search.flush() + + def _check_min_last_revision_date(min_last_revision_date): + actual_page = self.search.origin_search( + url_pattern=origin_url, min_last_revision_date=min_last_revision_date, + ) + assert actual_page.next_page_token is None + results = [r["url"] for r in actual_page.results] + expected_results = [origin_url] + assert sorted(results) == sorted(expected_results) + + now = datetime.now(tz=timezone.utc).isoformat() + now_minus_5_hours = ( + datetime.now(tz=timezone.utc) - timedelta(hours=5) + ).isoformat() + now_plus_5_hours = ( + datetime.now(tz=timezone.utc) + timedelta(hours=5) + ).isoformat() + + _update_last_revision_date(now) + + _check_min_last_revision_date(now) + _check_min_last_revision_date(now_minus_5_hours) + with pytest.raises(AssertionError): + _check_min_last_revision_date(now_plus_5_hours) + + _update_last_revision_date(now_plus_5_hours) + + _check_min_last_revision_date(now_plus_5_hours) + _check_min_last_revision_date(now) + + def test_origin_last_release_date_update_search(self): + origin_url = "http://foobar.baz" + self.search.origin_update([{"url": origin_url}]) + self.search.flush() + + def _update_last_release_date(last_release_date): + self.search.origin_update( + [{"url": origin_url, "last_release_date": last_release_date,}] + ) + self.search.flush() + + def _check_min_last_release_date(min_last_release_date): + actual_page = self.search.origin_search( + url_pattern=origin_url, min_last_release_date=min_last_release_date, + ) + assert actual_page.next_page_token is None + results = [r["url"] for r in actual_page.results] + expected_results = [origin_url] + assert sorted(results) == sorted(expected_results) + + now = datetime.now(tz=timezone.utc).isoformat() + now_minus_5_hours = ( + datetime.now(tz=timezone.utc) - timedelta(hours=5) + ).isoformat() + now_plus_5_hours = ( + datetime.now(tz=timezone.utc) + timedelta(hours=5) + ).isoformat() + + _update_last_release_date(now) + + _check_min_last_release_date(now) + _check_min_last_release_date(now_minus_5_hours) + with pytest.raises(AssertionError): + _check_min_last_release_date(now_plus_5_hours) + + _update_last_release_date(now_plus_5_hours) + + _check_min_last_release_date(now_plus_5_hours) + _check_min_last_release_date(now) + def test_origin_update_with_no_visit_types(self): """ Update an origin with visit types first then with no visit types,