diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 30a5796..6f32343 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,121 +1,127 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot ] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] # these @overload'ed versions of the object_key method aim at helping mypy figuring # the correct type-ing. @overload def object_key( object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] ) -> bytes: ... @overload def object_key( object_type: str, object_: Union[Origin, SkippedContent] ) -> Dict[str, bytes]: ... @overload def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: ... def object_key(object_type: str, object_) -> KeyType: if object_type in ("revision", "release", "directory", "snapshot"): return object_.id elif object_type == "content": return object_.sha1 # TODO: use a dict of hashes elif object_type == "skipped_content": return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} elif object_type == "origin": return {"url": object_.url} elif object_type == "origin_visit": return { "origin": object_.origin, "date": str(object_.date), } + elif object_type == "origin_visit_status": + return { + "origin": object_.origin, + "visit": str(object_.visit), + "date": str(object_.date), + } else: raise ValueError("Unknown object type: %s." % object_type) def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" value = msgpack_loads(kafka_value) if isinstance(value, list): return tuple(value) if isinstance(value, dict): return ensure_tuples(value) return value def ensure_tuples(value: Dict) -> Dict: return {k: tuple(v) if isinstance(v, list) else v for k, v in value.items()} diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 7759295..dbf1d0c 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,276 +1,287 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy import datetime from typing import Any, Dict, List, Type from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject from swh.model.model import ( BaseModel, Content, Directory, Origin, OriginVisit, + OriginVisitStatus, Release, Revision, SkippedContent, Snapshot, ) OBJECT_TYPES: Dict[Type[BaseModel], str] = { Content: "content", Directory: "directory", Origin: "origin", OriginVisit: "origin_visit", + OriginVisitStatus: "origin_visit_status", Release: "release", Revision: "revision", SkippedContent: "skipped_content", Snapshot: "snapshot", } UTC = datetime.timezone.utc CONTENTS = [ { **MultiHash.from_data(f"foo{i}".encode()).digest(), "length": 4, "status": "visible", } for i in range(10) ] SKIPPED_CONTENTS = [ { **MultiHash.from_data(f"bar{i}".encode()).digest(), "length": 4, "status": "absent", "reason": f"because chr({i}) != '*'", } for i in range(2) ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": (), }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": (), }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC), "snapshot": None, "status": "ongoing", "metadata": {"foo": "bar"}, "type": "git", "visit": 1, }, { "origin": ORIGINS[1]["url"], "date": datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC), "snapshot": None, "status": "ongoing", "metadata": {"baz": "qux"}, "type": "hg", "visit": 1, }, { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), "snapshot": None, "status": "ongoing", "metadata": {"baz": "qux"}, "type": "git", "visit": 2, }, { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), "snapshot": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), "status": "full", "metadata": {"baz": "qux"}, "type": "git", "visit": 3, }, { "origin": ORIGINS[1]["url"], "date": datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC), "snapshot": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), "status": "partial", "metadata": {"something": "wrong occurred"}, "type": "hg", "visit": 2, }, ] +ORIGIN_VISIT_STATUSES = [] +for visit in ORIGIN_VISITS: + visit_status = copy.deepcopy(visit) + visit_status.pop("type") + ORIGIN_VISIT_STATUSES.append(visit_status) + + DIRECTORIES = [ {"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": ()}, { "id": hash_to_bytes("cc13247a0d6584f297ca37b5868d2cbd242aea03"), "entries": ( { "name": b"file1.ext", "perms": 0o644, "type": "file", "target": CONTENTS[0]["sha1_git"], }, { "name": b"dir1", "perms": 0o755, "type": "dir", "target": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), }, { "name": b"subprepo1", "perms": 0o160000, "type": "rev", "target": REVISIONS[1]["id"], }, ), }, ] SNAPSHOTS = [ { "id": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), "branches": { b"master": {"target_type": "revision", "target": REVISIONS[0]["id"]} }, }, { "id": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), "branches": { b"target/revision": { "target_type": "revision", "target": REVISIONS[0]["id"], }, b"target/alias": {"target_type": "alias", "target": b"target/revision"}, b"target/directory": { "target_type": "directory", "target": DIRECTORIES[0]["id"], }, b"target/release": {"target_type": "release", "target": RELEASES[0]["id"]}, b"target/snapshot": { "target_type": "snapshot", "target": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), }, }, }, ] TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, "directory": DIRECTORIES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, + "origin_visit_status": ORIGIN_VISIT_STATUSES, "release": RELEASES, "revision": REVISIONS, "snapshot": SNAPSHOTS, "skipped_content": SKIPPED_CONTENTS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now(tz=UTC)} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index 69152a3..68d26f0 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,67 +1,68 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( kafka_server: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} # check privileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}_privileged.") } assert topics == { f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types } def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", "origin", "origin_visit", + "origin_visit_status", "release", "revision", "snapshot", "skipped_content", }, "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, }