KubeMQ Elixir SDK

Hex.pmCILicense

Table of Contents

Elixir client SDK for KubeMQ — a Kubernetes-native message queue broker.

Installation

Add kubemq to your list of dependencies in mix.exs:

def deps do
[
{:kubemq, "~> 1.0"}
]
end

For Broadway integration, also add:

{:broadway, "~> 1.0"}

Quick Start

# Start a client
{:ok, client} = KubeMQ.Client.start_link(
address: "localhost:50000",
client_id: "my-app"
)
# Send an event
:ok = KubeMQ.Client.send_event(client, %KubeMQ.Event{
channel: "notifications",
body: "Hello KubeMQ!",
metadata: "greeting"
})
# Subscribe to events
{:ok, sub} = KubeMQ.Client.subscribe_to_events(client, "notifications",
on_event: fn event ->
IO.puts("Received: #{event.body}")
end
)
# Clean up
KubeMQ.Subscription.cancel(sub)
KubeMQ.Client.close(client)

Features

Events (Pub/Sub)

Fire-and-forget messaging with optional consumer groups and wildcard subscriptions.

# Publish
:ok = KubeMQ.Client.send_event(client, %KubeMQ.Event{
channel: "events.orders",
body: Jason.encode!(%{order_id: 123}),
tags: %{"source" => "api"}
})
# Stream publish (high throughput)
{:ok, stream} = KubeMQ.Client.send_event_stream(client)
:ok = KubeMQ.EventStreamHandle.send(stream, %KubeMQ.Event{
channel: "events.orders",
body: "batch-item"
})
KubeMQ.EventStreamHandle.close(stream)
# Subscribe with consumer group
{:ok, sub} = KubeMQ.Client.subscribe_to_events(client, "events.>",
group: "order-processors",
on_event: fn event -> process_event(event) end,
on_error: fn error -> Logger.error("Sub error: #{error.message}") end
)

Events Store (Persistent Pub/Sub)

Persistent events with replay from any position.

# Publish (returns confirmation)
{:ok, result} = KubeMQ.Client.send_event_store(client, %KubeMQ.EventStore{
channel: "audit.logs",
body: Jason.encode!(%{action: "login", user: "alice"})
})
# Subscribe from the beginning
{:ok, sub} = KubeMQ.Client.subscribe_to_events_store(client, "audit.logs",
start_at: :start_from_first,
on_event: fn event ->
IO.puts("Seq #{event.sequence}: #{event.body}")
end
)
# Replay from a specific sequence
{:ok, sub} = KubeMQ.Client.subscribe_to_events_store(client, "audit.logs",
start_at: {:start_at_sequence, 100}
)
# Replay events from the last 5 minutes
{:ok, sub} = KubeMQ.Client.subscribe_to_events_store(client, "audit.logs",
start_at: {:start_at_time_delta, 300_000}
)

Commands (RPC)

Request-response with timeout.

# Send a command
{:ok, response} = KubeMQ.Client.send_command(client, %KubeMQ.Command{
channel: "commands.users",
body: Jason.encode!(%{action: "create", name: "Bob"}),
timeout: 10_000
})
if response.executed do
IO.puts("Command executed at #{response.executed_at}")
end
# Handle commands
{:ok, sub} = KubeMQ.Client.subscribe_to_commands(client, "commands.users",
on_command: fn cmd ->
%KubeMQ.CommandReply{
request_id: cmd.id,
response_to: cmd.reply_channel,
executed: true
}
end
)

Queries (RPC with Cache)

Request-response with optional server-side caching.

# Send a query with caching
{:ok, response} = KubeMQ.Client.send_query(client, %KubeMQ.Query{
channel: "queries.products",
body: Jason.encode!(%{id: 42}),
timeout: 10_000,
cache_key: "product:42",
cache_ttl: 60_000
})
IO.puts("Cache hit: #{response.cache_hit}")
# Handle queries
{:ok, sub} = KubeMQ.Client.subscribe_to_queries(client, "queries.products",
on_query: fn query ->
product = load_product(query.body)
%KubeMQ.QueryReply{
request_id: query.id,
response_to: query.reply_channel,
executed: true,
body: Jason.encode!(product)
}
end
)

Queues

Reliable message queues with at-least-once delivery.

# Send a message
{:ok, result} = KubeMQ.Client.send_queue_message(client, %KubeMQ.QueueMessage{
channel: "queue.tasks",
body: "process-this"
})
# Send with delivery policy
{:ok, result} = KubeMQ.Client.send_queue_message(client, %KubeMQ.QueueMessage{
channel: "queue.tasks",
body: "delayed-task",
policy: %KubeMQ.QueuePolicy{
delay_seconds: 30,
expiration_seconds: 3600,
max_receive_count: 3,
max_receive_queue: "queue.dlq"
}
})
# Poll and acknowledge (Stream API)
{:ok, poll} = KubeMQ.Client.poll_queue(client,
channel: "queue.tasks",
max_items: 10,
wait_timeout: 5_000
)
Enum.each(poll.messages, &process_message/1)
:ok = KubeMQ.PollResponse.ack_all(poll)
# Simple receive (Simple API)
{:ok, result} = KubeMQ.Client.receive_queue_messages(client, "queue.tasks",
max_messages: 5,
wait_timeout: 3_000
)

Channel Management

# Create channels
:ok = KubeMQ.Client.create_events_channel(client, "notifications")
:ok = KubeMQ.Client.create_queues_channel(client, "tasks")
# List channels
{:ok, channels} = KubeMQ.Client.list_queues_channels(client, "")
Enum.each(channels, fn ch ->
IO.puts("#{ch.name} - active: #{ch.is_active}")
end)
# Delete and purge
:ok = KubeMQ.Client.purge_queue_channel(client, "tasks")
:ok = KubeMQ.Client.delete_queues_channel(client, "tasks")

Broadway Producers

Process KubeMQ messages using Broadway data pipelines.

defmodule MyEventPipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {KubeMQ.Broadway.Events, [
client: MyApp.KubeMQ,
channel: "events.>",
group: "broadway-consumer"
]}
],
processors: [default: [concurrency: 4]],
batchers: [default: [batch_size: 100, batch_timeout: 1_000]]
)
end
@impl true
def handle_message(_processor, message, _context) do
event = message.data
IO.puts("Processing event on #{event.channel}")
message
end
@impl true
def handle_batch(_batcher, messages, _batch_info, _context) do
IO.puts("Batch of #{length(messages)} processed")
messages
end
end

Available producers: KubeMQ.Broadway.Events, KubeMQ.Broadway.EventsStore, KubeMQ.Broadway.Queues.

Configuration

All options are passed to KubeMQ.Client.start_link/1:

OptionTypeDefaultDescription
addressString.t()"localhost:50000"KubeMQ server address (host:port)
client_idString.t()requiredUnique client identifier
auth_tokenString.t()nilStatic JWT/OIDC auth token
credential_providermodule()nilModule implementing KubeMQ.CredentialProvider
tlskeyword()nilTLS/mTLS options (cacertfile, certfile, keyfile)
connection_timeoutpos_integer()10_000Connection timeout (ms)
keepalive_timepos_integer()10_000Keepalive interval (ms, min 5000)
keepalive_timeoutpos_integer()5_000Keepalive timeout (ms)
max_receive_sizepos_integer()4_194_304Max receive message size (bytes)
max_send_sizepos_integer()104_857_600Max send message size (bytes)
default_channelString.t()nilDefault channel for operations
default_cache_ttlpos_integer()900_000Default query cache TTL (ms)
receive_buffer_sizepos_integer()10Subscription receive buffer size
reconnect_buffer_sizepos_integer()1_000Max buffered ops during reconnect
reconnect_policykeyword()see belowReconnection policy
retry_policykeyword()see belowRetry policy for transient errors
nameatom() / {:via, ...}nilOTP process name
on_connected(-> :ok)nilConnection established callback
on_disconnected(-> :ok)nilConnection lost callback
on_reconnecting(-> :ok)nilReconnection attempt callback
on_reconnected(-> :ok)nilReconnection success callback
on_closed(-> :ok)nilConnection closed callback

Reconnect Policy Defaults

reconnect_policy: [
enabled: true,
initial_delay: 1_000,
max_delay: 30_000,
max_attempts: 0, # 0 = unlimited
multiplier: 2.0
]

Retry Policy Defaults

retry_policy: [
max_retries: 3,
initial_delay: 100,
max_delay: 5_000,
multiplier: 2.0
]

Supervision

Add the client to your application supervision tree:

defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{KubeMQ.Client,
address: "localhost:50000",
client_id: "my-app",
name: MyApp.KubeMQ}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end

Error Handling

All public functions return {:ok, result} | {:error, %KubeMQ.Error{}}. Bang variants (suffixed with !) raise KubeMQ.Error on failure.

case KubeMQ.Client.send_event(client, event) do
:ok -> :ok
{:error, %KubeMQ.Error{code: :validation} = err} ->
Logger.warning("Validation: #{err.message}")
{:error, %KubeMQ.Error{code: :transient, retryable?: true} = err} ->
Logger.error("Transient error (will retry): #{err.message}")
{:error, %KubeMQ.Error{} = err} ->
Logger.error("#{err.code}: #{err.message}")
end
# Or use bang variants for scripts
KubeMQ.Client.send_event!(client, event)

Error Codes

CodeRetryableDescription
:transientYesTemporary network/server issue
:timeoutYesOperation timed out
:throttlingYesRate limited
:authenticationNoInvalid credentials
:authorizationNoInsufficient permissions
:validationNoInvalid input
:not_foundNoResource not found
:fatalNoUnrecoverable error
:client_closedNoClient has been closed
:buffer_fullNoReconnect buffer overflow
:stream_brokenNoStream connection lost

TLS / mTLS

# TLS (server verification only)
{:ok, client} = KubeMQ.Client.start_link(
address: "broker:50000",
client_id: "secure-app",
tls: [cacertfile: "/path/to/ca.pem"]
)
# mTLS (mutual TLS)
{:ok, client} = KubeMQ.Client.start_link(
address: "broker:50000",
client_id: "mtls-app",
tls: [
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem",
verify: :verify_peer
]
)

Examples

See the examples/ directory for 59 runnable .exs scripts covering all messaging patterns. Run any example with:

cd examples/events
elixir basic_pubsub.exs

Documentation

Full API documentation is available on HexDocs.

Additional guides:

Requirements

License

Apache-2.0 — see LICENSE for details.