diff --git a/PKG-INFO b/PKG-INFO index bf487f9..b24f080 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.5.1 +Version: 0.6.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 0851e17..bcc23b2 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1 @@ -swh.core[db,http] >= 0.0.60 swh.model >= 0.7.2 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index bf487f9..b24f080 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.5.1 +Version: 0.6.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 1906592..c1a5a7d 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,9 +1,8 @@ confluent-kafka msgpack tenacity -swh.core[db,http]>=0.0.60 swh.model>=0.7.2 [testing] pytest hypothesis diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 07ecca7..41c48e8 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,67 +1,123 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime +from enum import Enum from typing import Any, Union import msgpack -from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.model import KeyType +class MsgpackExtTypeCodes(Enum): + LONG_INT = 1 + LONG_NEG_INT = 2 + + +# this as been copied from swh.core.api.serializer +# TODO refactor swh.core to make this function available +def _msgpack_encode_longint(value): + # needed because msgpack will not handle long integers with more than 64 bits + # which we unfortunately happen to have to deal with from time to time + if value > 0: + code = MsgpackExtTypeCodes.LONG_INT.value + else: + code = MsgpackExtTypeCodes.LONG_NEG_INT.value + value = -value + length, rem = divmod(value.bit_length(), 8) + if rem: + length += 1 + return msgpack.ExtType(code, int.to_bytes(value, length, "big")) + + +def msgpack_ext_encode_types(obj): + if isinstance(obj, int): + return _msgpack_encode_longint(obj) + return obj + + +def msgpack_ext_hook(code, data): + if code == MsgpackExtTypeCodes.LONG_INT.value: + return int.from_bytes(data, "big") + if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: + return -int.from_bytes(data, "big") + + raise ValueError("Unknown msgpack extended code %s" % code) + + +# for BW compat +def decode_types_bw(obj): + if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": + return datetime.datetime.fromisoformat(obj[b"d"]) + return obj + + def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" - return msgpack_dumps(value) + return msgpack.packb( + value, + use_bin_type=True, + datetime=True, # encode datetime as msgpack.Timestamp + default=msgpack_ext_encode_types, + ) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - value = msgpack_loads(kafka_value) + value = msgpack.unpackb( + kafka_value, + raw=False, + object_hook=decode_types_bw, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) return ensure_tuples(value) def ensure_tuples(value: Any) -> Any: if isinstance(value, (tuple, list)): return tuple(map(ensure_tuples, value)) elif isinstance(value, dict): return dict(ensure_tuples(list(value.items()))) else: return value diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index f2b9671..9420b10 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,69 +1,116 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict +from datetime import datetime, timedelta, timezone import itertools from typing import Iterable +import pytest + from swh.journal import serializers from .conftest import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() def test_kafka_to_key(): """Standard back and forth serialization with keys """ # All KeyType(s) keys: Iterable[serializers.KeyType] = [ {"a": "foo", "b": "bar", "c": "baz",}, {"a": b"foobarbaz",}, b"foo", ] for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() keys.append(key) for key in keys: ktk = serializers.key_to_kafka(key) v = serializers.kafka_to_key(ktk) assert v == key + + +# limits of supported int values by msgpack +MININT = -(2 ** 63) +MAXINT = 2 ** 64 - 1 + +intvalues = [ + MININT * 2, + MININT - 1, + MININT, + MININT + 1, + -10, + 0, + 10, + MAXINT - 1, + MAXINT, + MAXINT + 1, + MAXINT * 2, +] + + +@pytest.mark.parametrize("value", intvalues) +def test_encode_int(value): + assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value + + +datevalues = [ + datetime.now(tz=timezone.utc), + datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))), + datetime.now(tz=timezone(timedelta(hours=23, minutes=59))), + datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc), +] + + +@pytest.mark.parametrize("value", datevalues) +def test_encode_datetime(value): + assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value + + +@pytest.mark.parametrize("value", datevalues) +def test_encode_datetime_bw(value): + bwdate = {b"swhtype": "datetime", b"d": value.isoformat()} + assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py index baecadc..4272834 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -1,42 +1,43 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Manager from typing import Any, Generic, List, Tuple, TypeVar from . import ValueProtocol logger = logging.getLogger(__name__) TValue = TypeVar("TValue", bound=ValueProtocol) class InMemoryJournalWriter(Generic[TValue]): objects: List[Tuple[str, TValue]] privileged_objects: List[Tuple[str, TValue]] def __init__(self, value_sanitizer: Any): # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() self.privileged_objects = self.manager.list() def write_addition( self, object_type: str, object_: TValue, privileged: bool = False ) -> None: + object_.unique_key() # Check this does not error, to mimic the kafka writer if privileged: self.privileged_objects.append((object_type, object_)) else: self.objects.append((object_type, object_)) write_update = write_addition def write_additions( self, object_type: str, objects: List[TValue], privileged: bool = False ) -> None: for object_ in objects: self.write_addition(object_type, object_, privileged)