diff --git a/docs/example-journal-client.py b/docs/example-journal-client.py new file mode 100644 --- /dev/null +++ b/docs/example-journal-client.py @@ -0,0 +1,37 @@ +import pprint + + +def process_objects(all_objects): + """Worker function handling incoming objects""" + for (object_type, objects) in all_objects.items(): + for object_ in objects: + print(f"New {object_type} object:") + pprint.pprint(object_) + print() + + +def main(): + from swh.journal.client import get_journal_client + + # Usually read from a config file: + config = { + "brokers": ["localhost:9092"], + "group_id": "my-consumer-group", + "auto_offset_reset": "earliest", + } + + # Initialize the client + client = get_journal_client( + "kafka", object_types=["revision", "release"], privileged=True, **config + ) + + try: + # Run the client forever + client.process(process_objects) + except KeyboardInterrupt: + print("Called Ctrl-C, exiting.") + exit(0) + + +if __name__ == "__main__": + main() diff --git a/docs/index.rst b/docs/index.rst --- a/docs/index.rst +++ b/docs/index.rst @@ -6,10 +6,8 @@ Persistent logger of changes to the archive, with publish-subscribe support. -Reference Documentation ------------------------ - .. toctree:: :maxdepth: 2 + journal-clients /apidoc/swh.journal diff --git a/docs/journal-clients.rst b/docs/journal-clients.rst new file mode 100644 --- /dev/null +++ b/docs/journal-clients.rst @@ -0,0 +1,76 @@ +.. _journal_clients: + +Software Heritage Journal clients +================================= + +Journal client are processes that read data from the |swh| Journal, +in order to efficiently process all existing objects, and process new objects +as they come. +Some journal clients, such as :ref:`swh-dataset ` only read +existing objects and stop when they are done. + +They can run in parallel, and the :mod:`swh.journal.client` module +provides an abstraction handling all the setup, so actual clients are actually +a single function that takes :mod:`model objects ` as parameters. + +For example, a very simple journal client that prints all revisions and releases +to the console can be implemented like this: + +.. literalinclude:: example-journal-client.py + + +Parallelization +--------------- + +A single journal client, like the one above, is sequential. +It can however run concurrently by running the same program multiple time. +Kafka will coordinate the processes so the load is shared across processes. + +Authentication +-------------- + +In production, journal clients need credentials to access the journal. +Once you have credentials, they can be configured by adding this to the ``config``:: + + config = { + "sasl.mechanism": "SCRAM-SHA-512", + "security.protocol": "SASL_SSL", + "sasl.username": "", + "sasl.password": "", + } + +There are two types of client: privileged and unprivileged. +The former has access to all the data, the latter gets redacted authorship information, +for privacy reasons. +Instead, the ``name`` and ``email`` fields of ``author`` and ``committer`` attributes +of release and revision objects are blank, and their ``fullname`` is a SHA256 hash +of their actual fullname. +The ``privileged`` parameter to ``get_journal_client`` must be set accordingly. + +Order guarantees and replaying +------------------------------ + +The journal client shares the ordering guarantees of Kafka. +The short version is that you should not assume any order unless specified otherwise in +the `Kafka documentation `__, +nor that two related objects are sent to the same process. + +We call "replay" any workflow that involves a journal client writing all (or most) +objects to a new database. +This can be either continuous (in effect, this produces a mirror database), +or one-off. + +Either way, particular attention should be given to this lax ordering, as replaying +produces databases that are (temporarily) inconsistent, because some objects may +point to objects that are not replayed yet. + +For one-off replays, this can be mostly solved by processing objects +in reverse topologic order: +as contents don't reference any object, +directories only reference contents and directories, +revisions only reference directories, etc. ; +this means that replayers can first process all revisions, then all directories, +then all contents. +This keeps the number of inconsistencies relatively small. + +For continuous replays, replayed databases are eventually consistent.