KubeMQ Elixir SDK

Hex.pmCILicense

Table of Contents

Elixir client SDK for KubeMQ — a Kubernetes-native message queue broker.

Installation

Add kubemq to your list of dependencies in mix.exs:

def deps do
  [
    {:kubemq, "~> 1.0"}
  ]
end

For Broadway integration, also add:

{:broadway, "~> 1.0"}

Quick Start

# Start a client
{:ok, client} = KubeMQ.Client.start_link(
  address: "localhost:50000",
  client_id: "my-app"
)

# Send an event
:ok = KubeMQ.Client.send_event(client, %KubeMQ.Event{
  channel: "notifications",
  body: "Hello KubeMQ!",
  metadata: "greeting"
})

# Subscribe to events
{:ok, sub} = KubeMQ.Client.subscribe_to_events(client, "notifications",
  on_event: fn event ->
    IO.puts("Received: #{event.body}")
  end
)

# Clean up
KubeMQ.Subscription.cancel(sub)
KubeMQ.Client.close(client)

Features

Events (Pub/Sub)

Fire-and-forget messaging with optional consumer groups and wildcard subscriptions.

# Publish
:ok = KubeMQ.Client.send_event(client, %KubeMQ.Event{
  channel: "events.orders",
  body: Jason.encode!(%{order_id: 123}),
  tags: %{"source" => "api"}
})

# Stream publish (high throughput)
{:ok, stream} = KubeMQ.Client.send_event_stream(client)
:ok = KubeMQ.EventStreamHandle.send(stream, %KubeMQ.Event{
  channel: "events.orders",
  body: "batch-item"
})
KubeMQ.EventStreamHandle.close(stream)

# Subscribe with consumer group
{:ok, sub} = KubeMQ.Client.subscribe_to_events(client, "events.>",
  group: "order-processors",
  on_event: fn event -> process_event(event) end,
  on_error: fn error -> Logger.error("Sub error: #{error.message}") end
)

Events Store (Persistent Pub/Sub)

Persistent events with replay from any position.

# Publish (returns confirmation)
{:ok, result} = KubeMQ.Client.send_event_store(client, %KubeMQ.EventStore{
  channel: "audit.logs",
  body: Jason.encode!(%{action: "login", user: "alice"})
})

# Subscribe from the beginning
{:ok, sub} = KubeMQ.Client.subscribe_to_events_store(client, "audit.logs",
  start_at: :start_from_first,
  on_event: fn event ->
    IO.puts("Seq #{event.sequence}: #{event.body}")
  end
)

# Replay from a specific sequence
{:ok, sub} = KubeMQ.Client.subscribe_to_events_store(client, "audit.logs",
  start_at: {:start_at_sequence, 100}
)

# Replay events from the last 5 minutes
{:ok, sub} = KubeMQ.Client.subscribe_to_events_store(client, "audit.logs",
  start_at: {:start_at_time_delta, 300_000}
)

Commands (RPC)

Request-response with timeout.

# Send a command
{:ok, response} = KubeMQ.Client.send_command(client, %KubeMQ.Command{
  channel: "commands.users",
  body: Jason.encode!(%{action: "create", name: "Bob"}),
  timeout: 10_000
})

if response.executed do
  IO.puts("Command executed at #{response.executed_at}")
end

# Handle commands
{:ok, sub} = KubeMQ.Client.subscribe_to_commands(client, "commands.users",
  on_command: fn cmd ->
    %KubeMQ.CommandReply{
      request_id: cmd.id,
      response_to: cmd.reply_channel,
      executed: true
    }
  end
)

Queries (RPC with Cache)

Request-response with optional server-side caching.

# Send a query with caching
{:ok, response} = KubeMQ.Client.send_query(client, %KubeMQ.Query{
  channel: "queries.products",
  body: Jason.encode!(%{id: 42}),
  timeout: 10_000,
  cache_key: "product:42",
  cache_ttl: 60_000
})

IO.puts("Cache hit: #{response.cache_hit}")

# Handle queries
{:ok, sub} = KubeMQ.Client.subscribe_to_queries(client, "queries.products",
  on_query: fn query ->
    product = load_product(query.body)
    %KubeMQ.QueryReply{
      request_id: query.id,
      response_to: query.reply_channel,
      executed: true,
      body: Jason.encode!(product)
    }
  end
)

Queues

Reliable message queues with at-least-once delivery.

# Send a message
{:ok, result} = KubeMQ.Client.send_queue_message(client, %KubeMQ.QueueMessage{
  channel: "queue.tasks",
  body: "process-this"
})

# Send with delivery policy
{:ok, result} = KubeMQ.Client.send_queue_message(client, %KubeMQ.QueueMessage{
  channel: "queue.tasks",
  body: "delayed-task",
  policy: %KubeMQ.QueuePolicy{
    delay_seconds: 30,
    expiration_seconds: 3600,
    max_receive_count: 3,
    max_receive_queue: "queue.dlq"
  }
})

# Poll and acknowledge (Stream API)
{:ok, poll} = KubeMQ.Client.poll_queue(client,
  channel: "queue.tasks",
  max_items: 10,
  wait_timeout: 5_000
)

Enum.each(poll.messages, &process_message/1)
:ok = KubeMQ.PollResponse.ack_all(poll)

# Simple receive (Simple API)
{:ok, result} = KubeMQ.Client.receive_queue_messages(client, "queue.tasks",
  max_messages: 5,
  wait_timeout: 3_000
)

Channel Management

# Create channels
:ok = KubeMQ.Client.create_events_channel(client, "notifications")
:ok = KubeMQ.Client.create_queues_channel(client, "tasks")

# List channels
{:ok, channels} = KubeMQ.Client.list_queues_channels(client, "")
Enum.each(channels, fn ch ->
  IO.puts("#{ch.name} - active: #{ch.is_active}")
end)

# Delete and purge
:ok = KubeMQ.Client.purge_queue_channel(client, "tasks")
:ok = KubeMQ.Client.delete_queues_channel(client, "tasks")

Broadway Producers

Process KubeMQ messages using Broadway data pipelines.

defmodule MyEventPipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {KubeMQ.Broadway.Events, [
          client: MyApp.KubeMQ,
          channel: "events.>",
          group: "broadway-consumer"
        ]}
      ],
      processors: [default: [concurrency: 4]],
      batchers: [default: [batch_size: 100, batch_timeout: 1_000]]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    event = message.data
    IO.puts("Processing event on #{event.channel}")
    message
  end

  @impl true
  def handle_batch(_batcher, messages, _batch_info, _context) do
    IO.puts("Batch of #{length(messages)} processed")
    messages
  end
end

Available producers: KubeMQ.Broadway.Events, KubeMQ.Broadway.EventsStore, KubeMQ.Broadway.Queues.

Configuration

All options are passed to KubeMQ.Client.start_link/1:

Option Type Default Description
addressString.t()"localhost:50000" KubeMQ server address (host:port)
client_idString.t()required Unique client identifier
auth_tokenString.t()nil Static JWT/OIDC auth token
credential_providermodule()nil Module implementing KubeMQ.CredentialProvider
tlskeyword()nil TLS/mTLS options (cacertfile, certfile, keyfile)
connection_timeoutpos_integer()10_000 Connection timeout (ms)
keepalive_timepos_integer()10_000 Keepalive interval (ms, min 5000)
keepalive_timeoutpos_integer()5_000 Keepalive timeout (ms)
max_receive_sizepos_integer()4_194_304 Max receive message size (bytes)
max_send_sizepos_integer()104_857_600 Max send message size (bytes)
default_channelString.t()nil Default channel for operations
default_cache_ttlpos_integer()900_000 Default query cache TTL (ms)
receive_buffer_sizepos_integer()10 Subscription receive buffer size
reconnect_buffer_sizepos_integer()1_000 Max buffered ops during reconnect
reconnect_policykeyword() see below Reconnection policy
retry_policykeyword() see below Retry policy for transient errors
nameatom() / {:via, ...}nil OTP process name
on_connected(-> :ok)nil Connection established callback
on_disconnected(-> :ok)nil Connection lost callback
on_reconnecting(-> :ok)nil Reconnection attempt callback
on_reconnected(-> :ok)nil Reconnection success callback
on_closed(-> :ok)nil Connection closed callback

Reconnect Policy Defaults

reconnect_policy: [
  enabled: true,
  initial_delay: 1_000,
  max_delay: 30_000,
  max_attempts: 0,      # 0 = unlimited
  multiplier: 2.0
]

Retry Policy Defaults

retry_policy: [
  max_retries: 3,
  initial_delay: 100,
  max_delay: 5_000,
  multiplier: 2.0
]

Supervision

Add the client to your application supervision tree:

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {KubeMQ.Client,
        address: "localhost:50000",
        client_id: "my-app",
        name: MyApp.KubeMQ}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

Error Handling

All public functions return {:ok, result} | {:error, %KubeMQ.Error{}}. Bang variants (suffixed with !) raise KubeMQ.Error on failure.

case KubeMQ.Client.send_event(client, event) do
  :ok -> :ok
  {:error, %KubeMQ.Error{code: :validation} = err} ->
    Logger.warning("Validation: #{err.message}")
  {:error, %KubeMQ.Error{code: :transient, retryable?: true} = err} ->
    Logger.error("Transient error (will retry): #{err.message}")
  {:error, %KubeMQ.Error{} = err} ->
    Logger.error("#{err.code}: #{err.message}")
end

# Or use bang variants for scripts
KubeMQ.Client.send_event!(client, event)

Error Codes

Code Retryable Description
:transient Yes Temporary network/server issue
:timeout Yes Operation timed out
:throttling Yes Rate limited
:authentication No Invalid credentials
:authorization No Insufficient permissions
:validation No Invalid input
:not_found No Resource not found
:fatal No Unrecoverable error
:client_closed No Client has been closed
:buffer_full No Reconnect buffer overflow
:stream_broken No Stream connection lost

TLS / mTLS

# TLS (server verification only)
{:ok, client} = KubeMQ.Client.start_link(
  address: "broker:50000",
  client_id: "secure-app",
  tls: [cacertfile: "/path/to/ca.pem"]
)

# mTLS (mutual TLS)
{:ok, client} = KubeMQ.Client.start_link(
  address: "broker:50000",
  client_id: "mtls-app",
  tls: [
    cacertfile: "/path/to/ca.pem",
    certfile: "/path/to/client.pem",
    keyfile: "/path/to/client-key.pem",
    verify: :verify_peer
  ]
)

Examples

See the examples/ directory for 59 runnable .exs scripts covering all messaging patterns. Run any example with:

cd examples/events
elixir basic_pubsub.exs

Documentation

Full API documentation is available on HexDocs.

Additional guides:

Requirements

License

Apache-2.0 — see LICENSE for details.