cihex.pm badgeDocumentation badge

Klife

Klife is a modern, ergonomic and high-performance Kafka client built from the ground up with minimal dependencies.

Key Features

Installation and Base setup

0. Add klife to your list of dependencies in mix.exs:

def deps do
  [
    {:klife, "~> 1.0"}
  ]
end

1. Define your application client

defmodule MyApp.Client do
  use Klife.Client, otp_app: :my_app
end

2. Add basic configuration

config :my_app, MyApp.Client,
  connection: [
    bootstrap_servers: ["localhost:19092", "localhost:29092"],
    ssl: false
  ]

Check out the Klife.Clientdocs for more details regarding configurations!

3. Add the client to the supervision tree

children = [ MyApp.Client ]

opts = [strategy: :one_for_one, name: Example.Supervisor]
Supervisor.start_link(children, opts)

Produce records

Assuming a proper setup as described above, now you just need to call your client produce API!

my_rec = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
{:ok, %Klife.Record{}} = MyApp.Client.produce(my_rec)

# Batch produce across multiple topics and partitions in a single call
my_recs = [
  %Klife.Record{value: "val_1", topic: "my_topic_1"},
  %Klife.Record{value: "val_2", topic: "my_topic_2"}
]
[{:ok, %Klife.Record{}}, {:ok, %Klife.Record{}}] = MyApp.Client.produce_batch(my_recs)

Check out the Klife.Clientdocs for more details regarding the produce API and the Klife.Producerdocs for configuration options!

Consumer Group

Assuming a proper setup as described above, now you just need to define a Consumer Group module:

defmodule MyApp.MyConsumerGroup do
  use Klife.Consumer.ConsumerGroup,
    client: MyApp.Client,
    group_name: "my_group_name",
    topics: [
      [name: "my_topic_1"],
      [name: "my_topic_2"]
    ]

  @impl true
  def handle_record_batch(topic, partition, cg_name, record_list) do
    Enum.map(record_list, fn %Klife.Record{} = rec ->
      IO.inspect("Consuming record with offset #{rec.offset} and value #{rec.value}!")
      {:commit, rec}
    end)
  end
end

and then start it under your supervision tree!

children = [ MyApp.MyConsumerGroup ]

opts = [strategy: :one_for_one, name: Example.Supervisor]
Supervisor.start_link(children, opts)

Check out the Klife.Consumer.ConsumerGroupdocs for more details regarding the consumer group behaviour and configuration options!

Reliability

Klife has been validated through thousands of randomized simulation runs using a dedicated chaos testing framework. Each simulation generates a unique scenario from a vast configuration space randomizing all possible configurations.

During each run, the simulator continuously injects failures:

Throughout all of this, the framework asserts three core invariants that must never be violated:

  1. No duplicate consumption: Every record is delivered exactly once per consumer group (only possible to ensure on runs without injected failures)
  2. Ordered delivery: Offsets within a partition are always consumed in order
  3. No data loss: Every produced record is eventually consumed by all subscribed groups

The full simulator source code is available in the simulator directory.

Upgrade from 0.x to 1.x

Kafka versions support

The focus of Klife is to support the most recent versions of Kafka (i.e. >= 4.0); all development and testing was performed almost exclusively on these versions.

That said, it is possible to use some features of Klife on older versions. The consumer group feature will not work on older brokers, but for produce-only workflows Klife may still work fine (Kafka versions >= 2.4).

During startup, Klife will emit warning logs for any features that were disabled due to lack of compatibility with the broker. The logs follow the format: [Name of the feature] feature disabled due....

If you attempt to use a disabled feature at runtime, Klife will raise with a descriptive error message.