diff --git a/swh/search/journal_client.py b/swh/search/journal_client.py index 6fbd50d..cfbf25a 100644 --- a/swh/search/journal_client.py +++ b/swh/search/journal_client.py @@ -1,134 +1,139 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from typing import Dict +from typing import Dict, Optional from swh.model.model import TargetType from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.storage.interface import StorageInterface EXPECTED_MESSAGE_TYPES = { "origin", "origin_visit", "origin_visit_status", "origin_intrinsic_metadata", } def fetch_last_revision_release_date( snapshot_id: bytes, storage: StorageInterface ) -> Dict[str, str]: if not snapshot_id: return {} snapshot = snapshot_get_all_branches(storage, snapshot_id) if not snapshot: return {} branches = snapshot.branches.values() tip_revision_ids = [] tip_release_ids = [] for branch in branches: if branch.target_type == TargetType.REVISION: tip_revision_ids.append(branch.target) elif branch.target_type == TargetType.RELEASE: tip_release_ids.append(branch.target) revision_datetimes = [ revision.date.to_datetime() for revision in storage.revision_get(tip_revision_ids) if revision and revision.date ] release_datetimes = [ release.date.to_datetime() for release in storage.release_get(tip_release_ids) if release and release.date ] ret = {} if revision_datetimes: ret["last_revision_date"] = max(revision_datetimes).isoformat() if release_datetimes: ret["last_release_date"] = max(release_datetimes).isoformat() return ret def process_journal_objects(messages, *, search, storage=None): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) <= EXPECTED_MESSAGE_TYPES, set(messages) if "origin" in messages: process_origins(messages["origin"], search) if "origin_visit" in messages: process_origin_visits(messages["origin_visit"], search) if "origin_visit_status" in messages: process_origin_visit_statuses(messages["origin_visit_status"], search, storage) if "origin_intrinsic_metadata" in messages: process_origin_intrinsic_metadata(messages["origin_intrinsic_metadata"], search) def process_origins(origins, search): logging.debug("processing origins %r", origins) search.origin_update(origins) def process_origin_visits(visits, search): logging.debug("processing origin visits %r", visits) search.origin_update( [ { "url": ( visit["origin"] if isinstance(visit["origin"], str) else visit["origin"]["url"] ), "visit_types": [visit["type"]], } for visit in visits ] ) def process_origin_visit_statuses(visit_statuses, search, storage): logging.debug("processing origin visit statuses %r", visit_statuses) + def hexify(b: Optional[bytes]) -> Optional[str]: + if b is None: + return None + return b.hex() + full_visit_status = [ { "url": visit_status["origin"], "has_visits": True, "nb_visits": visit_status["visit"], - "snapshot_id": visit_status.get("snapshot"), + "snapshot_id": hexify(visit_status.get("snapshot")), "last_visit_date": visit_status["date"].isoformat(), "last_eventful_visit_date": visit_status["date"].isoformat(), **fetch_last_revision_release_date(visit_status.get("snapshot"), storage), } for visit_status in visit_statuses if visit_status["status"] == "full" ] if full_visit_status: search.origin_update(full_visit_status) def process_origin_intrinsic_metadata(origin_metadata, search): logging.debug("processing origin intrinsic_metadata %r", origin_metadata) origin_metadata = [ {"url": item["id"], "intrinsic_metadata": item["metadata"],} for item in origin_metadata ] search.origin_update(origin_metadata) diff --git a/swh/search/tests/test_journal_client.py b/swh/search/tests/test_journal_client.py index 563b43f..0b57d58 100644 --- a/swh/search/tests/test_journal_client.py +++ b/swh/search/tests/test_journal_client.py @@ -1,306 +1,306 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import functools from unittest.mock import MagicMock import pytest from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, Snapshot, SnapshotBranch, TargetType, Timestamp, TimestampWithTimezone, hash_to_bytes, ) from swh.search.journal_client import ( fetch_last_revision_release_date, process_journal_objects, ) from swh.storage import get_storage DATES = [ TimestampWithTimezone( timestamp=Timestamp(seconds=1234567891, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567892, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567893, microseconds=0,), offset=120, negative_utc=False, ), TimestampWithTimezone( timestamp=Timestamp(seconds=1234567894, microseconds=0,), offset=120, negative_utc=False, ), ] COMMITTERS = [ Person(fullname=b"foo", name=b"foo", email=b""), Person(fullname=b"bar", name=b"bar", email=b""), ] REVISIONS = [ Revision( message=b"revision_1_message", date=DATES[0], committer=COMMITTERS[0], author=COMMITTERS[0], committer_date=DATES[0], type=RevisionType.GIT, directory=b"\x01" * 20, synthetic=False, metadata=None, parents=( hash_to_bytes("9b918dd063cec85c2bc63cc7f167e29f5894dcbc"), hash_to_bytes("757f38bdcd8473aaa12df55357f5e2f1a318e672"), ), ), Revision( message=b"revision_2_message", date=DATES[1], committer=COMMITTERS[1], author=COMMITTERS[1], committer_date=DATES[1], type=RevisionType.MERCURIAL, directory=b"\x02" * 20, synthetic=False, metadata=None, parents=(), extra_headers=((b"foo", b"bar"),), ), Revision( message=b"revision_3_message", date=DATES[2], committer=COMMITTERS[0], author=COMMITTERS[0], committer_date=DATES[2], type=RevisionType.GIT, directory=b"\x03" * 20, synthetic=False, metadata=None, parents=(), ), ] RELEASES = [ Release( name=b"v0.0.1", date=DATES[1], author=COMMITTERS[0], target_type=ObjectType.REVISION, target=b"\x04" * 20, message=b"foo", synthetic=False, ), Release( name=b"v0.0.2", date=DATES[2], author=COMMITTERS[1], target_type=ObjectType.REVISION, target=b"\x05" * 20, message=b"bar", synthetic=False, ), Release( name=b"v0.0.3", date=DATES[3], author=COMMITTERS[1], target_type=ObjectType.REVISION, target=b"\x05" * 20, message=b"foobar", synthetic=False, ), ] SNAPSHOTS = [ Snapshot( branches={ b"target/revision1": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[0].id, ), b"target/revision2": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[1].id, ), b"target/revision3": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[2].id, ), b"target/release1": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[0].id ), b"target/release2": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[1].id ), b"target/release3": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[2].id ), b"target/alias": SnapshotBranch( target_type=TargetType.ALIAS, target=b"target/revision1" ), }, ), Snapshot( branches={ b"target/revision1": SnapshotBranch( target_type=TargetType.REVISION, target=REVISIONS[0].id, ) }, ), Snapshot( branches={ b"target/release1": SnapshotBranch( target_type=TargetType.RELEASE, target=RELEASES[0].id ) }, ), Snapshot(branches={}), ] @pytest.fixture def storage(): storage = get_storage("memory") storage.revision_add(REVISIONS) storage.release_add(RELEASES) storage.snapshot_add(SNAPSHOTS) return storage def test_journal_client_origin_from_journal(): search_mock = MagicMock() worker_fn = functools.partial(process_journal_objects, search=search_mock,) worker_fn({"origin": [{"url": "http://foobar.baz"},]}) search_mock.origin_update.assert_called_once_with( [{"url": "http://foobar.baz"},] ) search_mock.reset_mock() worker_fn({"origin": [{"url": "http://foobar.baz"}, {"url": "http://barbaz.qux"},]}) search_mock.origin_update.assert_called_once_with( [{"url": "http://foobar.baz"}, {"url": "http://barbaz.qux"},] ) def test_journal_client_origin_visit_from_journal(): search_mock = MagicMock() worker_fn = functools.partial(process_journal_objects, search=search_mock,) worker_fn({"origin_visit": [{"origin": "http://foobar.baz", "type": "git"},]}) search_mock.origin_update.assert_called_once_with( [{"url": "http://foobar.baz", "visit_types": ["git"]},] ) def test_journal_client_origin_visit_status_from_journal(storage): search_mock = MagicMock() worker_fn = functools.partial( process_journal_objects, search=search_mock, storage=storage ) current_datetime = datetime.now(tz=timezone.utc) worker_fn( { "origin_visit_status": [ { "origin": "http://foobar.baz", "status": "full", "visit": 5, "date": current_datetime, "snapshot": SNAPSHOTS[0].id, } # full visits ok ] } ) search_mock.origin_update.assert_called_once_with( [ { "url": "http://foobar.baz", "has_visits": True, "nb_visits": 5, - "snapshot_id": SNAPSHOTS[0].id, + "snapshot_id": SNAPSHOTS[0].id.hex(), "last_visit_date": current_datetime.isoformat(), "last_eventful_visit_date": current_datetime.isoformat(), "last_revision_date": "2009-02-14T01:31:33+02:00", "last_release_date": "2009-02-14T01:31:34+02:00", }, ] ) search_mock.reset_mock() # non-full visits are filtered out worker_fn( { "origin_visit_status": [ { "origin": "http://foobar.baz", "status": "partial", "visit": 5, "date": current_datetime, } ] } ) search_mock.origin_update.assert_not_called() def test_journal_client_origin_metadata_from_journal(): search_mock = MagicMock() worker_fn = functools.partial(process_journal_objects, search=search_mock,) worker_fn( { "origin_intrinsic_metadata": [ { "id": "http://foobar.baz", "metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar", "programmingLanguage": "python", "license": "MIT", }, }, ] } ) search_mock.origin_update.assert_called_once_with( [ { "url": "http://foobar.baz", "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar", "programmingLanguage": "python", "license": "MIT", }, }, ] ) def test_fetch_last_revision_release_date(storage): for snapshot in SNAPSHOTS: assert fetch_last_revision_release_date(snapshot.id, storage) is not None