KafkaEx

Unit TestsIntegration TestsStatic ChecksCoverage StatusHex.pm versionHex.pm downloadsLicenseAPI Docs

KafkaEx is an Elixir client for Apache Kafka. KafkaEx requires Elixir 1.14+ and Erlang OTP 24+. Supported Kafka versions: 0.11.0 and newer (see Supported Kafka Versions).

Project Status

KafkaEx v1.0 is the Kayrock-based release. Scope:

📚 Documentation

Table of Contents

Features

KafkaEx v1.0 uses Kayrock for Kafka protocol serialization with automatic API version negotiation—no manual version configuration needed.

Core Capabilities

Supported Kafka Versions

Quick Start

Installation

Add KafkaEx to your mix.exs dependencies:

def deps do
[
{:kafka_ex, "~> 1.0"}
]
end

Then run:

mix deps.get

Simple Producer & Consumer

# Start a client
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])
# Produce a message
{:ok, _metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello")
# Fetch messages from offset 0
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)

Using KafkaEx.API as a Behaviour

For production applications, define a module with the KafkaEx.API behaviour:

defmodule MyApp.Kafka do
use KafkaEx.API, client: MyApp.KafkaClient
end
# In your application.ex supervision tree:
children = [
{KafkaEx.API, name: MyApp.KafkaClient, brokers: [{"localhost", 9092}]}
]
# Now call without passing client:
MyApp.Kafka.produce_one("my-topic", 0, "hello")
{:ok, messages} = MyApp.Kafka.fetch("my-topic", 0, 0)

See KafkaEx.API documentation for the complete API reference.

Configuration

KafkaEx can be configured via config.exs, by passing options directly to KafkaEx.API.start_client/1, or both — options passed to start_client/1 override the matching config.exs defaults. The broker option key is :brokers (the legacy :uris is still accepted but deprecated).

Basic Configuration

# config/config.exs
config :kafka_ex,
# List of Kafka brokers
brokers: [{"localhost", 9092}, {"localhost", 9093}],
# Client identifier
client_id: "my-app",
# Default consumer group
default_consumer_group: "my-consumer-group",
# Request timeout (milliseconds)
sync_timeout: 10_000

SSL/TLS Configuration

config :kafka_ex,
brokers: [{"kafka.example.com", 9093}],
use_ssl: true,
ssl_options: [
cacertfile: "/path/to/ca-cert.pem",
certfile: "/path/to/client-cert.pem",
keyfile: "/path/to/client-key.pem",
verify: :verify_peer
]

Consumer Group Settings

config :kafka_ex,
default_consumer_group: "my-group",
# Auto-commit settings
commit_interval: 5_000, # Commit every 5 seconds
commit_threshold: 100, # Or every 100 messages
# What to do when no committed offset exists, or the requested offset is
# out of range. Allowed values:
# :latest — reset to the newest offset, skip backlog (the library
# default; matches the Kafka/Java client default)
# :earliest — reset to the oldest available offset (common for new
# consumer groups that want to read existing data)
# :none — raise instead of guessing (strict; surfaces a
# missing/out-of-range offset instead of silently
# replaying or skipping data)
auto_offset_reset: :latest

Advanced tuning

Most users do not need to change these. Shown here with defaults so the knobs are discoverable.

config :kafka_ex,
# Declare the compression algorithms your app uses so Client.init
# crashes loudly at boot if the backing optional dep isn't loaded.
# Default: [] (no validation). Any of :gzip (needs no dep), :snappy
# (needs :snappyer), :lz4 (needs :lz4b), :zstd (needs :ezstd).
required_compression: [],
# Delay before a broker-reconnect retry (ms). Lower reconnects faster
# but hammers down brokers; higher smooths flapping at the cost of
# longer error windows.
sleep_for_reconnect: 400,
# Periodic metadata refresh cadence (ms). The client issues a full
# Metadata request this often to pick up leader elections, new
# topics, and broker membership changes.
metadata_update_interval: 30_000,
# Top-level application supervisor restart intensity — if more than
# max_restarts children exit in any max_seconds window, the
# supervisor shuts down. Tuning these trades "restart spam on flaps"
# against "tolerance for broker brownouts".
max_restarts: 10,
max_seconds: 60

Compression Configuration

Compression is set per-request, not globally:

# Produce with gzip compression
KafkaEx.API.produce(client, "topic", 0, messages, compression: :gzip)
# Supported: :none (default), :gzip, :snappy, :lz4, :zstd

For Snappy compression, add to mix.exs:

{:snappyer, "~> 1.2"}

Dynamic Configuration

You can use MFA or anonymous functions for dynamic broker resolution:

# Using MFA tuple
config :kafka_ex,
brokers: {MyApp.Config, :get_kafka_brokers, []}
# Using anonymous function
config :kafka_ex,
brokers: fn -> Application.get_env(:my_app, :kafka_brokers) end

See KafkaEx.Config for all available options.

Usage

Basic Producer/Consumer

Producing Messages

# Single message
{:ok, metadata} = KafkaEx.API.produce_one(
client,
"my-topic",
0, # partition
"hello world" # message value
)
# With message key (for partition routing)
{:ok, metadata} = KafkaEx.API.produce_one(
client,
"my-topic",
0,
"message value",
key: "user-123"
)
# Batch produce
messages = [
%{value: "message 1", key: "key1"},
%{value: "message 2", key: "key2"},
%{value: "message 3", key: "key3"}
]
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, messages)

Consuming Messages

# Fetch from specific offset
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 100)
result.records
|> Enum.each(fn record ->
IO.puts("Offset: #{record.offset}, Value: #{record.value}")
end)
# Fetch all messages (earliest to high watermark)
{:ok, result} = KafkaEx.API.fetch_all(client, "my-topic", 0)

Consumer Groups

Consumer groups provide coordinated consumption with automatic partition assignment and offset management.

1. Implement a Consumer

defmodule MyApp.MessageConsumer do
use KafkaEx.Consumer.GenConsumer
require Logger
# Messages are delivered in batches
def handle_message_set(message_set, state) do
Enum.each(message_set, fn record ->
Logger.info("Processing: #{inspect(record.value)}")
# Process your message here
end)
# Commit offsets asynchronously
{:async_commit, state}
end
end

Available commit strategies:

2. Add to Supervision Tree

# In your application.ex
def start(_type, _args) do
children = [
# Start the consumer group
%{
id: MyApp.MessageConsumer,
start: {
KafkaEx.Consumer.ConsumerGroup,
:start_link,
[
MyApp.MessageConsumer, # Your consumer module
"my-consumer-group", # Consumer group ID
["topic1", "topic2"], # Topics to consume
[
# Optional configuration
commit_interval: 5_000,
commit_threshold: 100,
auto_offset_reset: :earliest
]
]
}
}
]
Supervisor.start_link(children, strategy: :one_for_one)
end

See KafkaEx.Consumer.GenConsumer for details.

Metadata & Offsets

Topic Metadata

# Get all topics
{:ok, metadata} = KafkaEx.API.metadata(client)
# Get specific topics
{:ok, metadata} = KafkaEx.API.metadata(client, ["topic1", "topic2"])
# Inspect partitions
metadata.topics
|> Enum.each(fn topic ->
IO.puts("Topic: #{topic.name}, Partitions: #{length(topic.partitions)}")
end)

Offset Operations

# Get latest offset for a partition
{:ok, offset} = KafkaEx.API.latest_offset(client, "my-topic", 0)
# Get earliest offset
{:ok, offset} = KafkaEx.API.earliest_offset(client, "my-topic", 0)
# List offsets by timestamp
timestamp = DateTime.utc_now() |> DateTime.add(-3600, :second) |> DateTime.to_unix(:millisecond)
partition_request = %{partition_num: 0, timestamp: timestamp}
{:ok, offsets} = KafkaEx.API.list_offsets(client, [{"my-topic", [partition_request]}])
# Fetch committed offset for consumer group
partitions = [%{partition_num: 0}]
{:ok, offsets} = KafkaEx.API.fetch_committed_offset(
client,
"my-consumer-group",
"my-topic",
partitions
)
# Commit offset for consumer group
partitions = [%{partition_num: 0, offset: 100}]
{:ok, result} = KafkaEx.API.commit_offset(
client,
"my-consumer-group",
"my-topic",
partitions
)

Topic Management

# Create a topic
{:ok, result} = KafkaEx.API.create_topic(
client,
"new-topic",
num_partitions: 3,
replication_factor: 2,
config_entries: %{
"retention.ms" => "86400000",
"compression.type" => "gzip"
}
)
# Delete a topic
{:ok, result} = KafkaEx.API.delete_topic(client, "old-topic")

Compression

KafkaEx supports multiple compression formats. Compression is applied per-request:

# Gzip compression (built-in)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :gzip
)
# Snappy compression (requires snappyer package)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :snappy
)
# LZ4 compression (built-in, Kafka 0.9.0+)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :lz4
)
# Zstd compression (built-in, Kafka 2.1.0+)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :zstd
)

Supported Formats:

FormatKafka VersionDependency Required
:gzip0.7.0+None (built-in)
:snappy0.8.0+{:snappyer, "~> 1.2"}
:lz40.9.0+None (built-in)
:zstd2.1.0+None (built-in)

Decompression is handled automatically when consuming messages.

Authentication (SASL)

KafkaEx supports multiple SASL authentication mechanisms for secure connections to Kafka clusters.

SASL/PLAIN

Simple username/password authentication. Always use with SSL/TLS to protect credentials.

config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
ssl_options: [verify: :verify_peer, cacertfile: "/path/to/ca.pem"],
sasl: %{
mechanism: :plain,
username: "alice",
password: "secret123"
}

SASL/SCRAM

Challenge-response authentication (more secure than PLAIN).

config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
sasl: %{
mechanism: :scram,
username: "alice",
password: "secret123",
mechanism_opts: %{algo: :sha256} # or :sha512
}

OAUTHBEARER

OAuth 2.0 token-based authentication.

config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
sasl: %{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: &MyApp.get_oauth_token/0,
extensions: %{"traceId" => "optional-data"}
}
}

AWS MSK IAM

AWS IAM authentication for Amazon Managed Streaming for Kafka (MSK).

config :kafka_ex,
brokers: [{"msk-cluster.region.amazonaws.com", 9098}],
use_ssl: true,
sasl: %{
mechanism: :msk_iam,
mechanism_opts: %{
region: "us-east-1"
# Credentials automatically resolved from environment
}
}

Authentication Requirements:

MechanismMinimum KafkaSSL RequiredNotes
PLAIN0.9.0+✅ YesNever use without SSL/TLS
SCRAM0.10.2+⚠️ RecommendedChallenge-response, more secure
OAUTHBEARER2.0+⚠️ RecommendedRequires token provider
MSK_IAMMSK 2.7.1+✅ YesAWS-specific

See AUTH.md for detailed authentication setup and troubleshooting.

Telemetry & Observability

KafkaEx emits telemetry events for monitoring connections, requests, and consumer operations.

Event Categories

CategoryEventsDescription
Connection4Connect, disconnect, reconnect, close
Request4Request start/stop/exception, retry
Produce4Produce start/stop/exception, batch metrics
Fetch4Fetch start/stop/exception, messages received
Offset4Commit/fetch offset operations
Consumer8Group join, sync, heartbeat, rebalance, message processing
Metadata4Cluster metadata updates
SASL Auth6PLAIN/SCRAM authentication spans

Example: Attaching a Handler

defmodule MyApp.KafkaTelemetry do
require Logger
def attach do
:telemetry.attach_many(
"my-kafka-handler",
[
[:kafka_ex, :connection, :stop],
[:kafka_ex, :request, :stop],
[:kafka_ex, :produce, :stop],
[:kafka_ex, :fetch, :stop]
],
&handle_event/4,
nil
)
end
def handle_event([:kafka_ex, :connection, :stop], measurements, metadata, _config) do
Logger.info("Connected to #{metadata.host}:#{metadata.port} in #{measurements.duration / 1_000_000}ms")
end
def handle_event([:kafka_ex, :request, :stop], measurements, metadata, _config) do
Logger.debug("Request #{metadata.api_key} took #{measurements.duration / 1_000_000}ms")
end
def handle_event([:kafka_ex, :produce, :stop], measurements, metadata, _config) do
Logger.info("Produced #{measurements.message_count} messages to #{metadata.topic}")
end
def handle_event([:kafka_ex, :fetch, :stop], measurements, metadata, _config) do
Logger.info("Fetched #{measurements.message_count} messages from #{metadata.topic}")
end
end

Then in your application startup:

# application.ex
def start(_type, _args) do
MyApp.KafkaTelemetry.attach()
# ...
end

See KafkaEx.Telemetry for the complete event reference.

Error Handling & Resilience

KafkaEx v1.0 includes smart error handling and retry logic for production resilience.

Automatic Retries with Exponential Backoff

Consumer Group Resilience

Consumer groups handle transient errors gracefully following the Java client pattern (KAFKA-6829):

Safe Produce Retry Policy

Important: Produce requests only retry on leadership errors where we know the message wasn't written. Timeout errors are NOT retried to prevent potential duplicate messages.

For truly idempotent produces, enable enable.idempotence=true on your Kafka cluster (requires Kafka 0.11+).

SSL/TLS Timeouts (Known Issue)

When using certain versions of OTP, random timeouts can occur with SSL.

Impacted versions:

Solution: Upgrade to OTP 21.3.8.15 or 22.3.2+.

Testing

Unit Tests (No Kafka Required)

Run tests that don't require a live Kafka cluster:

mix test.unit

Integration Tests (Docker Required)

KafkaEx includes a Dockerized test cluster with 3 Kafka brokers configured with different authentication mechanisms:

Ports:

Start the test cluster:

./scripts/docker_up.sh

Run all tests:

# Unit tests
mix test.unit
# Integration tests
mix test.integration
# All tests together
mix test

Run specific test categories:

mix test --only consumer_group
mix test --only produce
mix test --only consume
mix test --only auth

Run SASL tests:

MIX_ENV=test mix test --include sasl

Static Analysis

mix format # Format code
mix format --check-formatted
mix credo --strict # Linting
mix dialyzer # Type checking

Contributing

All contributions are managed through the KafkaEx GitHub repo.

Maintainers

kafka_ex has no tiered maintainer structure — every user of kafka_ex is a maintainer. If you run it in production, triage issues when you hit them, send PRs when you find bugs, and help review others' PRs. The project survives on that.

Historically active contributors you may see around the issues and PRs:

Support scope

v1.0 is the Kayrock-based release. Expect prioritisation rather than blanket coverage.

In scope:

Accepted with best-effort review (PRs welcome):

Explicitly out of scope for v1.x:

Response targets

Security

Do not file public GitHub issues for security vulnerabilities. Use either:

Response within ~2 weeks; disclosure coordination typically 30–90 days. Security fixes land on 1.0.x; 0.15.x receives security-only patches through 2027-04-17; earlier versions are unsupported.

License

KafkaEx is released under the MIT License. See LICENSE for details.