diff --git a/swh/search/journal_client.py b/swh/search/journal_client.py index cfbf25a..2812901 100644 --- a/swh/search/journal_client.py +++ b/swh/search/journal_client.py @@ -1,139 +1,124 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Dict, Optional from swh.model.model import TargetType from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.storage.interface import StorageInterface EXPECTED_MESSAGE_TYPES = { "origin", - "origin_visit", "origin_visit_status", "origin_intrinsic_metadata", } def fetch_last_revision_release_date( snapshot_id: bytes, storage: StorageInterface ) -> Dict[str, str]: if not snapshot_id: return {} snapshot = snapshot_get_all_branches(storage, snapshot_id) if not snapshot: return {} branches = snapshot.branches.values() tip_revision_ids = [] tip_release_ids = [] for branch in branches: if branch.target_type == TargetType.REVISION: tip_revision_ids.append(branch.target) elif branch.target_type == TargetType.RELEASE: tip_release_ids.append(branch.target) revision_datetimes = [ revision.date.to_datetime() for revision in storage.revision_get(tip_revision_ids) if revision and revision.date ] release_datetimes = [ release.date.to_datetime() for release in storage.release_get(tip_release_ids) if release and release.date ] ret = {} if revision_datetimes: ret["last_revision_date"] = max(revision_datetimes).isoformat() if release_datetimes: ret["last_release_date"] = max(release_datetimes).isoformat() return ret def process_journal_objects(messages, *, search, storage=None): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) <= EXPECTED_MESSAGE_TYPES, set(messages) if "origin" in messages: process_origins(messages["origin"], search) - if "origin_visit" in messages: - process_origin_visits(messages["origin_visit"], search) - if "origin_visit_status" in messages: process_origin_visit_statuses(messages["origin_visit_status"], search, storage) if "origin_intrinsic_metadata" in messages: process_origin_intrinsic_metadata(messages["origin_intrinsic_metadata"], search) def process_origins(origins, search): logging.debug("processing origins %r", origins) search.origin_update(origins) -def process_origin_visits(visits, search): - logging.debug("processing origin visits %r", visits) - - search.origin_update( - [ - { - "url": ( - visit["origin"] - if isinstance(visit["origin"], str) - else visit["origin"]["url"] - ), - "visit_types": [visit["type"]], - } - for visit in visits - ] - ) - - def process_origin_visit_statuses(visit_statuses, search, storage): logging.debug("processing origin visit statuses %r", visit_statuses) def hexify(b: Optional[bytes]) -> Optional[str]: if b is None: return None return b.hex() - full_visit_status = [ - { + processed_visit_statuses = [] + for visit_status in visit_statuses: + processed_status = { "url": visit_status["origin"], - "has_visits": True, - "nb_visits": visit_status["visit"], - "snapshot_id": hexify(visit_status.get("snapshot")), - "last_visit_date": visit_status["date"].isoformat(), - "last_eventful_visit_date": visit_status["date"].isoformat(), - **fetch_last_revision_release_date(visit_status.get("snapshot"), storage), + "visit_types": [visit_status["type"]], } - for visit_status in visit_statuses - if visit_status["status"] == "full" - ] - - if full_visit_status: - search.origin_update(full_visit_status) + if visit_status["status"] == "full": + processed_status.update( + { + "has_visits": True, + "nb_visits": visit_status["visit"], + "snapshot_id": hexify(visit_status.get("snapshot")), + "last_visit_date": visit_status["date"].isoformat(), + "last_eventful_visit_date": visit_status["date"].isoformat(), + **fetch_last_revision_release_date( + visit_status.get("snapshot"), storage + ), + } + ) + processed_visit_statuses.append(processed_status) + + if processed_visit_statuses: + search.origin_update(processed_visit_statuses) def process_origin_intrinsic_metadata(origin_metadata, search): logging.debug("processing origin intrinsic_metadata %r", origin_metadata) origin_metadata = [ {"url": item["id"], "intrinsic_metadata": item["metadata"],} for item in origin_metadata ] search.origin_update(origin_metadata) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index 136c2a7..e237c5f 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,444 +1,394 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy from datetime import datetime, timezone import tempfile from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import value_to_kafka from swh.model.hashutil import hash_to_bytes from swh.search import get_search from swh.search.cli import search_cli_group CLI_CONFIG = """ search: cls: elasticsearch hosts: - '%(elasticsearch_host)s' indexes: origin: index: test read_alias: test-read write_alias: test-write storage: cls: memory """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write( (CLI_CONFIG + config) % {"elasticsearch_host": elasticsearch_host} ) config_fd.seek(0) result = runner.invoke(search_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__origin( swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin producer", "acks": "all", } ) origin_foobar_baz = { "url": "http://foobar.baz", } value = value_to_kafka(origin_foobar_baz) topic = f"{kafka_prefix}.origin" producer.produce(topic=topic, key=b"bogus-origin", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin", "--prefix", kafka_prefix, ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # searching origin without visit as requirement actual_page = swh_search.origin_search(url_pattern="foobar") # We find it assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] # It's an origin with no visit, searching for it with visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) # returns nothing assert actual_page.next_page_token is None assert actual_page.results == [] -def test__journal_client__origin_visit( - swh_search, elasticsearch_host, kafka_prefix: str, kafka_server -): - """Tests the re-indexing when origin_batch_size*task_batch_size is a - divisor of nb_origins.""" - origin_foobar = {"url": "http://baz.foobar"} - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test search origin visit producer", - "acks": "all", - } - ) - topic = f"{kafka_prefix}.origin_visit" - value = value_to_kafka({"origin": origin_foobar["url"], "type": "git"}) - producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) - - journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( - broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" - ) - result = invoke( - False, - [ - "journal-client", - "objects", - "--stop-after-objects", - "1", - "--object-type", - "origin_visit", - ], - journal_objects_config, - elasticsearch_host=elasticsearch_host, - ) - - # Check the output - expected_output = "Processed 1 messages.\nDone.\n" - assert result.exit_code == 0, result.output - assert result.output == expected_output - - swh_search.flush() - - actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) - assert actual_page.next_page_token is None - assert actual_page.results == [origin_foobar] - - # Not considered visited unless the visit is full - actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) - assert actual_page.next_page_token is None - assert actual_page.results == [] - - def test__journal_client__origin_visit_status( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Subscribing to origin-visit-status should result in swh-search indexation """ origin_foobar = {"url": "http://baz.foobar"} producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin visit status producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_visit_status" value = value_to_kafka( { "origin": origin_foobar["url"], "visit": 1, + "type": "git", "date": datetime.now(tz=timezone.utc), "snapshot": None, "status": "full", } ) producer.produce(topic=topic, key=b"bogus-origin-visit-status", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "origin_visit_status", ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # Both search returns the visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] def test__journal_client__origin_intrinsic_metadata( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Subscribing to origin-intrinsic-metadata should result in swh-search indexation """ origin_foobar = {"url": "https://github.com/clojure/clojure"} origin_intrinsic_metadata = { "id": origin_foobar["url"], "metadata": { "name": "clojure", "type": "SoftwareSourceCode", "license": "http://opensource.org/licenses/eclipse-1.0.php", "version": "1.10.2-master-SNAPSHOT", "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "identifier": "org.clojure", "description": "Clojure core environment and runtime library.", "codeRepository": "https://repo.maven.apache.org/maven2/org/clojure/clojure", # noqa }, "indexer_configuration_id": 1, "from_revision": hash_to_bytes("f47c139e20970ee0852166f48ee2a4626632b86e"), "mappings": ["maven"], } producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin intrinsic metadata producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_intrinsic_metadata" value = value_to_kafka(origin_intrinsic_metadata) producer.produce(topic=topic, key=b"bogus-origin-intrinsic-metadata", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin_intrinsic_metadata", ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # search without visit returns the metadata actual_page = swh_search.origin_search(url_pattern="clojure", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] # no visit associated so it does not return anything actual_page = swh_search.origin_search(url_pattern="clojure", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [] def test__journal_client__missing_main_journal_config_key(elasticsearch_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], config="", # missing config will make it raise elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_journal_config_keys(elasticsearch_host): """Missing configuration on mandatory journal keys should raise""" kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=[ "journal-client", "objects", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "origin_visit_status", ], config=yaml_cfg, # incomplete config will make the cli raise elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_prefix_config_key( swh_search, elasticsearch_host, kafka_server ): """Missing configuration on mandatory prefix key should raise""" journal_cfg_template = """ journal: brokers: - {broker} group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="prefix"): invoke( False, # Missing --prefix (and no config key) will make the cli raise [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin_visit_status", ], journal_cfg, elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_object_types_config_key( swh_search, elasticsearch_host, kafka_server ): """Missing configuration on mandatory object-types key should raise""" journal_cfg_template = """ journal: brokers: - {broker} prefix: swh.journal.objects group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="object_types"): invoke( False, # Missing --object-types (and no config key) will make the cli raise ["journal-client", "objects", "--stop-after-objects", "1"], journal_cfg, elasticsearch_host=elasticsearch_host, ) def test__initialize__with_index_name(elasticsearch_host): """Initializing the index with an index name should create the right index""" search = get_search( "elasticsearch", hosts=[elasticsearch_host], indexes={"origin": {"index": "test"}}, ) assert search._get_origin_index() == "test" assert search._get_origin_read_alias() == "origin-read" assert search._get_origin_write_alias() == "origin-write" def test__initialize__with_read_alias(elasticsearch_host): """Initializing the index with a search alias name should create the right search alias""" search = get_search( "elasticsearch", hosts=[elasticsearch_host], indexes={"origin": {"read_alias": "test"}}, ) assert search._get_origin_index() == "origin" assert search._get_origin_read_alias() == "test" assert search._get_origin_write_alias() == "origin-write" def test__initialize__with_write_alias(elasticsearch_host): """Initializing the index with an indexing alias name should create the right indexing alias""" search = get_search( "elasticsearch", hosts=[elasticsearch_host], indexes={"origin": {"write_alias": "test"}}, ) assert search._get_origin_index() == "origin" assert search._get_origin_read_alias() == "origin-read" assert search._get_origin_write_alias() == "test" diff --git a/swh/search/tests/test_journal_client.py b/swh/search/tests/test_journal_client.py index 0b57d58..c225c55 100644 --- a/swh/search/tests/test_journal_client.py +++ b/swh/search/tests/test_journal_client.py @@ -1,306 +1,300 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools from unittest.mock import MagicMock import pytest from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, Snapshot, SnapshotBranch, TargetType, Timestamp, TimestampWithTimezone, hash_to_bytes, ) from swh.search.journal_client import ( fetch_last_revision_release_date, process_journal_objects, ) from swh.storage import get_storage DATES = [ TimestampWithTimezone( timestamp=Timestamp(seconds=1234567891, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567892, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567893, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567894, microseconds=0,), offset=120, negative_utc=False, ), ] COMMITTERS = [ Person(fullname=b"foo", name=b"foo", email=b""), Person(fullname=b"bar", name=b"bar", email=b""), ] REVISIONS = [ Revision( message=b"revision_1_message", date=DATES[0], committer=COMMITTERS[0], author=COMMITTERS[0], committer_date=DATES[0], type=RevisionType.GIT, directory=b"\x01" * 20, synthetic=False, metadata=None, parents=( hash_to_bytes("9b918dd063cec85c2bc63cc7f167e29f5894dcbc"), hash_to_bytes("757f38bdcd8473aaa12df55357f5e2f1a318e672"), ), ), Revision( message=b"revision_2_message", date=DATES[1], committer=COMMITTERS[1], author=COMMITTERS[1], committer_date=DATES[1], type=RevisionType.MERCURIAL, directory=b"\x02" * 20, synthetic=False, metadata=None, parents=(), extra_headers=((b"foo", b"bar"),), ), Revision( message=b"revision_3_message", date=DATES[2], committer=COMMITTERS[0], author=COMMITTERS[0], committer_date=DATES[2], type=RevisionType.GIT, directory=b"\x03" * 20, synthetic=False, metadata=None, parents=(), ), ] RELEASES = [ Release( name=b"v0.0.1", date=DATES[1], author=COMMITTERS[0], target_type=ObjectType.REVISION, target=b"\x04" * 20, message=b"foo", synthetic=False, ), Release( name=b"v0.0.2", date=DATES[2], author=COMMITTERS[1], target_type=ObjectType.REVISION, target=b"\x05" * 20, message=b"bar", synthetic=False, ), Release( name=b"v0.0.3", date=DATES[3], author=COMMITTERS[1], target_type=ObjectType.REVISION, target=b"\x05" * 20, message=b"foobar", synthetic=False, ), ] SNAPSHOTS = [ Snapshot( branches={ b"target/revision1": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[0].id, ), b"target/revision2": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[1].id, ), b"target/revision3": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[2].id, ), b"target/release1": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[0].id ), b"target/release2": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[1].id ), b"target/release3": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[2].id ), b"target/alias": SnapshotBranch( target_type=TargetType.ALIAS, target=b"target/revision1" ), }, ), Snapshot( branches={ b"target/revision1": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[0].id, ) }, ), Snapshot( branches={ b"target/release1": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[0].id ) }, ), Snapshot(branches={}), ] @pytest.fixture def storage(): storage = get_storage("memory") storage.revision_add(REVISIONS) storage.release_add(RELEASES) storage.snapshot_add(SNAPSHOTS) return storage def test_journal_client_origin_from_journal(): search_mock = MagicMock() worker_fn = functools.partial(process_journal_objects, search=search_mock,) worker_fn({"origin": [{"url": "http://foobar.baz"},]}) search_mock.origin_update.assert_called_once_with( [{"url": "http://foobar.baz"},] ) search_mock.reset_mock() worker_fn({"origin": [{"url": "http://foobar.baz"}, {"url": "http://barbaz.qux"},]}) search_mock.origin_update.assert_called_once_with( [{"url": "http://foobar.baz"}, {"url": "http://barbaz.qux"},] ) -def test_journal_client_origin_visit_from_journal(): - search_mock = MagicMock() - - worker_fn = functools.partial(process_journal_objects, search=search_mock,) - - worker_fn({"origin_visit": [{"origin": "http://foobar.baz", "type": "git"},]}) - search_mock.origin_update.assert_called_once_with( - [{"url": "http://foobar.baz", "visit_types": ["git"]},] - ) - - def test_journal_client_origin_visit_status_from_journal(storage): search_mock = MagicMock() worker_fn = functools.partial( process_journal_objects, search=search_mock, storage=storage ) current_datetime = datetime.now(tz=timezone.utc) worker_fn( { "origin_visit_status": [ { "origin": "http://foobar.baz", "status": "full", + "type": "git", "visit": 5, "date": current_datetime, "snapshot": SNAPSHOTS[0].id, } # full visits ok ] } ) search_mock.origin_update.assert_called_once_with( [ { "url": "http://foobar.baz", + "visit_types": ["git"], "has_visits": True, "nb_visits": 5, "snapshot_id": SNAPSHOTS[0].id.hex(), "last_visit_date": current_datetime.isoformat(), "last_eventful_visit_date": current_datetime.isoformat(), "last_revision_date": "2009-02-14T01:31:33+02:00", "last_release_date": "2009-02-14T01:31:34+02:00", }, ] ) search_mock.reset_mock() - # non-full visits are filtered out + # non-full visits only set the visit_types attribute worker_fn( { "origin_visit_status": [ { "origin": "http://foobar.baz", + "type": "git", "status": "partial", "visit": 5, "date": current_datetime, } ] } ) - search_mock.origin_update.assert_not_called() + search_mock.origin_update.assert_called_once_with( + [{"url": "http://foobar.baz", "visit_types": ["git"]}] + ) def test_journal_client_origin_metadata_from_journal(): search_mock = MagicMock() worker_fn = functools.partial(process_journal_objects, search=search_mock,) worker_fn( { "origin_intrinsic_metadata": [ { "id": "http://foobar.baz", "metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar", "programmingLanguage": "python", "license": "MIT", }, }, ] } ) search_mock.origin_update.assert_called_once_with( [ { "url": "http://foobar.baz", "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar", "programmingLanguage": "python", "license": "MIT", }, }, ] ) def test_fetch_last_revision_release_date(storage): for snapshot in SNAPSHOTS: assert fetch_last_revision_release_date(snapshot.id, storage) is not None