Orkestra
A lightweight CQRS/ES toolkit for Elixir. Pluggable message bus, event store, and OpenTelemetry tracing built in.
Orkestra gives you the building blocks without the framework lock-in. Define commands and events with a declarative DSL, wire up handlers that auto-subscribe, and swap between in-process PubSub and distributed RabbitMQ with a single config change.
Installation
def deps do
[
{:orkestra, "~> 0.1.0"}
]
endQuick start
Define a command
defmodule MyApp.Commands.PlaceOrder do
use Orkestra.Command
param :product_id, :string, required: true
param :quantity, :integer, required: true
param :notes, :string, default: ""
endDefine an event
defmodule MyApp.Events.OrderPlaced do
use Orkestra.Event
field :order_id, :string, required: true
field :product_id, :string, required: true
field :quantity, :integer, required: true
field :placed_at, :string, required: true
endHandle the command
defmodule MyApp.Handlers.PlaceOrderHandler do
use Orkestra.CommandHandler,
command: MyApp.Commands.PlaceOrder
@impl true
def execute(command, metadata) do
order_id = Ecto.UUID.generate()
# ... your business logic ...
{:ok, %{order_id: order_id}}
end
endReact to the event
defmodule MyApp.Handlers.SendOrderConfirmation do
use Orkestra.EventHandler,
event: MyApp.Events.OrderPlaced
@impl true
def handle_event(event, _metadata) do
MyApp.Mailer.send_confirmation(event.data.order_id)
:ok
end
endDispatch
alias Orkestra.{CommandEnvelope, MessageBus}
{:ok, cmd} = MyApp.Commands.PlaceOrder.new(%{
product_id: "sku_42",
quantity: 3
}, actor_id: "user_123", source: "web")
bus = MessageBus.impl()
:ok = bus.dispatch(CommandEnvelope.wrap(cmd, max_retries: 2))Supervision tree
children = [
{Phoenix.PubSub, name: MyApp.PubSub},
Orkestra.MessageBus.PubSub,
MyApp.Handlers.PlaceOrderHandler,
MyApp.Handlers.SendOrderConfirmation
]Configuration
Message bus adapter
# In-process (dev, test, single-node)
config :orkestra, Orkestra.MessageBus,
adapter: Orkestra.MessageBus.PubSub,
app_prefix: MyApp
config :orkestra, Orkestra.MessageBus.PubSub,
pubsub: MyApp.PubSub
# Distributed (production, multi-node)
config :orkestra, Orkestra.MessageBus,
adapter: Orkestra.MessageBus.RabbitMQ,
app_prefix: MyApp
config :orkestra, Orkestra.MessageBus.RabbitMQ,
channel_provider: fn -> MyApp.RabbitMQ.Connection.channel() endTopic derivation
Topics are derived automatically from module names. The app_prefix is stripped:
MyApp.Orders.Commands.PlaceOrder -> "orders.commands.place_order"
MyApp.Orders.Events.OrderPlaced -> "orders.events.order_placed"Event store
# EventStoreDB (production)
config :orkestra, Orkestra.EventStore,
adapter: Orkestra.EventStore.EventStoreDB
config :orkestra, Orkestra.EventStore.EventStoreDB,
connection_string: "esdb://localhost:2113?tls=false"
# In-memory (test)
config :orkestra, Orkestra.EventStore,
adapter: Orkestra.EventStore.InMemoryCore concepts
Commands
Commands represent an intent to change the system. They are validated, dispatched to a single handler, and either succeed or fail.
defmodule CreateAccount do
use Orkestra.Command
param :email, :string, required: true
param :name, :string, required: true
param :plan, :string, default: "free"
@impl true
def validate(%{email: email}) do
if String.contains?(email, "@"), do: :ok, else: {:error, :invalid_email}
end
end
{:ok, cmd} = CreateAccount.new(%{email: "a@b.com", name: "Alice"})
{:error, {:missing_params, [:email]}} = CreateAccount.new(%{name: "Bob"})
{:error, :invalid_email} = CreateAccount.new(%{email: "nope", name: "Eve"})Events
Events represent something that happened. They are immutable facts, never rejected. Events can be derived from commands or other events, preserving the correlation chain.
defmodule AccountCreated do
use Orkestra.Event
field :account_id, :string, required: true
field :email, :string, required: true
field :plan, :string, required: true
end
# From a command (preserves correlation, sets causation)
{:ok, event} = AccountCreated.from_command(cmd, %{
account_id: "acc_123",
email: "a@b.com",
plan: "free"
})
event.metadata.correlation_id == cmd.metadata.correlation_id # true
event.metadata.causation_id == cmd.id # trueMetadata
Every command and event carries metadata that flows through the pipeline:
%Orkestra.Metadata{
correlation_id: "abc123", # links an entire chain of commands/events
causation_id: "cmd_456", # what directly caused this
actor_id: "user_789", # who initiated it
actor_type: :user, # :user | :system | :expert | :scheduler
source: "web", # where it originated
issued_at: ~U[2026-03-27 12:00:00Z]
}
# Derive child metadata (preserves correlation, sets causation)
child = Orkestra.Metadata.derive(parent_metadata, "parent_id")Envelopes
Envelopes wrap commands and events with dispatch context.
Command envelopes track dispatch lifecycle and retries:
env = CommandEnvelope.wrap(cmd, max_retries: 3)
env.status # :pending -> :dispatched -> :succeeded | :failed | :rejected
CommandEnvelope.retryable?(env) # true if failed and attempts <= max_retriesEvent envelopes track delivery to multiple handlers:
env = EventEnvelope.wrap(event) |> EventEnvelope.mark_published()
env = EventEnvelope.register_handler(env, "SendEmail")
env = EventEnvelope.register_handler(env, "UpdateIndex")
env = EventEnvelope.mark_handler_succeeded(env, "SendEmail")
env = EventEnvelope.mark_handler_failed(env, "UpdateIndex")
env.status # :partially_handledEvent handlers
Subscribe to one event, multiple events, or wildcard patterns:
# Single event
use Orkestra.EventHandler,
event: MyApp.Events.OrderPlaced
# Multiple events
use Orkestra.EventHandler,
events: [MyApp.Events.OrderPlaced, MyApp.Events.OrderCancelled]
# Wildcard pattern
use Orkestra.EventHandler,
topic: "orders.events.#"
# With retry config
use Orkestra.EventHandler,
event: MyApp.Events.OrderPlaced,
max_retries: 5Message bus adapters
PubSub (in-process)
Synchronous dispatch. Commands go to one handler, events broadcast to all subscribers. Retries are immediate (recursive). Dead-lettered messages are broadcast on "orkestra:deadletter".
RabbitMQ (distributed)
Commands use exchange orkestra.commands with one queue per command type (competing consumers). Events use exchange orkestra.events with one queue per handler (fan-out).
Features:
-
Retry tracking via
x-deathheaders (native RabbitMQ) -
Max retries via
x-max-retriesheader -
Dead letter exchange
orkestra.deadletterwith catch-all queue - All queues declared with DLX configuration
- W3C trace context propagation in AMQP headers
Observability
Orkestra is instrumented with OpenTelemetry out of the box.
Span hierarchy
orkestra.command.dispatch (message bus)
orkestra.command.handle (command handler)
orkestra.event.publish (message bus)
orkestra.event.handle (event handler)
Additional spans: orkestra.retry, orkestra.rabbitmq.publish (kind: producer), orkestra.rabbitmq.consume (kind: consumer).
Span attributes
All spans include: orkestra.command.type, orkestra.command.id, orkestra.correlation_id, orkestra.causation_id, orkestra.actor_id, orkestra.handler.
Structured logging
All log messages use Logger metadata instead of string interpolation:
[info] Command handler subscribed handler=MyApp.HandleOrder topic=orders.commands.place_order orkestra=command_handler
[warning] Handler nack, requeuing handler=MyApp.HandleOrder attempt=2 max_retries=3 orkestra=rabbitmq
[error] Dead letter recorded handler=MyApp.HandleOrder reason=:timeout orkestra=pubsub
Logger metadata set during handler execution: correlation_id, causation_id, actor_id, trace_id, span_id.
Distributed tracing (RabbitMQ)
Trace context is injected into AMQP message headers on publish and extracted on consume, creating linked spans across nodes. Uses OpentelemetryProcessPropagator for context propagation across BEAM processes.
License
MIT