diff --git a/swh/search/journal_client.py b/swh/search/journal_client.py --- a/swh/search/journal_client.py +++ b/swh/search/journal_client.py @@ -12,7 +12,6 @@ EXPECTED_MESSAGE_TYPES = { "origin", - "origin_visit", "origin_visit_status", "origin_intrinsic_metadata", } @@ -68,9 +67,6 @@ if "origin" in messages: process_origins(messages["origin"], search) - if "origin_visit" in messages: - process_origin_visits(messages["origin_visit"], search) - if "origin_visit_status" in messages: process_origin_visit_statuses(messages["origin_visit_status"], search, storage) @@ -84,24 +80,6 @@ search.origin_update(origins) -def process_origin_visits(visits, search): - logging.debug("processing origin visits %r", visits) - - search.origin_update( - [ - { - "url": ( - visit["origin"] - if isinstance(visit["origin"], str) - else visit["origin"]["url"] - ), - "visit_types": [visit["type"]], - } - for visit in visits - ] - ) - - def process_origin_visit_statuses(visit_statuses, search, storage): logging.debug("processing origin visit statuses %r", visit_statuses) @@ -110,22 +88,29 @@ return None return b.hex() - full_visit_status = [ - { + processed_visit_statuses = [] + for visit_status in visit_statuses: + processed_status = { "url": visit_status["origin"], - "has_visits": True, - "nb_visits": visit_status["visit"], - "snapshot_id": hexify(visit_status.get("snapshot")), - "last_visit_date": visit_status["date"].isoformat(), - "last_eventful_visit_date": visit_status["date"].isoformat(), - **fetch_last_revision_release_date(visit_status.get("snapshot"), storage), + "visit_types": [visit_status["type"]], } - for visit_status in visit_statuses - if visit_status["status"] == "full" - ] - - if full_visit_status: - search.origin_update(full_visit_status) + if visit_status["status"] == "full": + processed_status.update( + { + "has_visits": True, + "nb_visits": visit_status["visit"], + "snapshot_id": hexify(visit_status.get("snapshot")), + "last_visit_date": visit_status["date"].isoformat(), + "last_eventful_visit_date": visit_status["date"].isoformat(), + **fetch_last_revision_release_date( + visit_status.get("snapshot"), storage + ), + } + ) + processed_visit_statuses.append(processed_status) + + if processed_visit_statuses: + search.origin_update(processed_visit_statuses) def process_origin_intrinsic_metadata(origin_metadata, search): diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -112,57 +112,6 @@ assert actual_page.results == [] -def test__journal_client__origin_visit( - swh_search, elasticsearch_host, kafka_prefix: str, kafka_server -): - """Tests the re-indexing when origin_batch_size*task_batch_size is a - divisor of nb_origins.""" - origin_foobar = {"url": "http://baz.foobar"} - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test search origin visit producer", - "acks": "all", - } - ) - topic = f"{kafka_prefix}.origin_visit" - value = value_to_kafka({"origin": origin_foobar["url"], "type": "git"}) - producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) - - journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( - broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" - ) - result = invoke( - False, - [ - "journal-client", - "objects", - "--stop-after-objects", - "1", - "--object-type", - "origin_visit", - ], - journal_objects_config, - elasticsearch_host=elasticsearch_host, - ) - - # Check the output - expected_output = "Processed 1 messages.\nDone.\n" - assert result.exit_code == 0, result.output - assert result.output == expected_output - - swh_search.flush() - - actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) - assert actual_page.next_page_token is None - assert actual_page.results == [origin_foobar] - - # Not considered visited unless the visit is full - actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) - assert actual_page.next_page_token is None - assert actual_page.results == [] - - def test__journal_client__origin_visit_status( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): @@ -182,6 +131,7 @@ { "origin": origin_foobar["url"], "visit": 1, + "type": "git", "date": datetime.now(tz=timezone.utc), "snapshot": None, "status": "full", diff --git a/swh/search/tests/test_journal_client.py b/swh/search/tests/test_journal_client.py --- a/swh/search/tests/test_journal_client.py +++ b/swh/search/tests/test_journal_client.py @@ -201,17 +201,6 @@ ) -def test_journal_client_origin_visit_from_journal(): - search_mock = MagicMock() - - worker_fn = functools.partial(process_journal_objects, search=search_mock,) - - worker_fn({"origin_visit": [{"origin": "http://foobar.baz", "type": "git"},]}) - search_mock.origin_update.assert_called_once_with( - [{"url": "http://foobar.baz", "visit_types": ["git"]},] - ) - - def test_journal_client_origin_visit_status_from_journal(storage): search_mock = MagicMock() @@ -226,6 +215,7 @@ { "origin": "http://foobar.baz", "status": "full", + "type": "git", "visit": 5, "date": current_datetime, "snapshot": SNAPSHOTS[0].id, @@ -237,6 +227,7 @@ [ { "url": "http://foobar.baz", + "visit_types": ["git"], "has_visits": True, "nb_visits": 5, "snapshot_id": SNAPSHOTS[0].id.hex(), @@ -250,12 +241,13 @@ search_mock.reset_mock() - # non-full visits are filtered out + # non-full visits only set the visit_types attribute worker_fn( { "origin_visit_status": [ { "origin": "http://foobar.baz", + "type": "git", "status": "partial", "visit": 5, "date": current_datetime, @@ -263,7 +255,9 @@ ] } ) - search_mock.origin_update.assert_not_called() + search_mock.origin_update.assert_called_once_with( + [{"url": "http://foobar.baz", "visit_types": ["git"]}] + ) def test_journal_client_origin_metadata_from_journal():