ObanEvents
A lightweight, persistent event handling library for Elixir applications built on top of Oban.
Features
- 🔒 Persistent - Handler jobs survive application restarts
- 🔄 Retryable - Automatic retries on failure via Oban
- âš¡ Async - Non-blocking event emission
- 🔗 Transactional - Job creation works within database transactions
- 📊 Observable - Track processing via Oban Web UI
- ✅ Validated - Events are validated at compile time
- 🎯 Decoupled - Event emitters don't know about handlers
Installation
Add oban_events to your list of dependencies in mix.exs:
def deps do
[
{:oban_events, "~> 1.0"},
{:oban, "~> 2.0"}
]
endQuick Start
1. Define Your Events
Create a module that uses ObanEvents, pass your Oban instance and optional global job config (defaults: queue: :oban_events, max_attempts: 3, priority: 2), then define your events and handlers:
defmodule MyApp.Events do
use ObanEvents,
oban: {MyApp.Oban, queue: :myapp_events, max_attempts: 3}
@events %{
user_created: [
{MyApp.EmailHandler, oban: [priority: 0, max_attempts: 5, tags: ["critical"]]},
MyApp.AnalyticsHandler
],
user_updated: [MyApp.CacheHandler],
order_placed: [MyApp.NotificationHandler]
}
end2. Create Event Handlers
Implement the ObanEvents.Handler behaviour. Handlers receive an Event struct with your data plus metadata:
defmodule MyApp.EmailHandler do
use ObanEvents.Handler
require Logger
@impl true
def handle_event(:user_created, %Event{data: data, idempotency_key: key}) do
%{"user_id" => user_id, "email" => email} = data
# Use idempotency_key for outbox pattern
case OutboxEmail.insert(%{
idempotency_key: key,
user_id: user_id,
email: email,
template: :welcome,
status: :pending
}, on_conflict: :nothing, conflict_target: :idempotency_key) do
{:ok, _} ->
Logger.info("Welcome email queued for #{email}")
:ok
{:error, _} ->
Logger.info("Welcome email already queued for #{email}")
:ok
end
end
end3. Emit Events
Emit events from your application code, preferably within transactions:
defmodule MyApp.Accounts do
alias MyApp.{Repo, Events}
def create_user(attrs) do
Repo.transact(fn ->
with {:ok, user} <- Repo.insert(changeset),
{:ok, _jobs} <- Events.emit(:user_created, %{
user_id: user.id,
email: user.email
}) do
{:ok, user}
end
end)
end
endHow it works
flowchart TD
A[Business Logic] -->|1. emit event + data| B[Events.emit]
B -->|2. lookup handlers| C[Create Oban jobs]
C -->|3. persist dispatch jobs| D[Oban processes jobs]
D -->|4. dispatch| E[EmailHandler]
D -->|4. dispatch| F[AnalyticsHandler]Event Metadata
Handlers receive an ObanEvents.Event struct containing your data plus metadata for deduplication, tracing, and correlation:
%Event{
data: %{"user_id" => 123}, # Your event data
event_id: "01933b7e-8a3f-7f6f-9e42-6c8f3a0b2d1e", # Unique per emit
idempotency_key: "01933b7e-9f2c-7a1b-8d4e-3c5f6a7b8c9d", # Unique per job
causation_id: "01933b7e-7f4a-7c2d-9b3e-4d5f6a7b8c9d", # Optional: parent event_id
correlation_id: "01933b7e-6e3b-7d5f-8c4e-5d6f7a8b9c0d" # Optional: business operation grouping
}Metadata Fields
event_id - Emit Identifier
- Generated: Once per
emitcall (UUIDv7) - Shared: All handlers for the same emit get the same
event_id - Use: Track which emit created jobs, pass as
causation_idin child emits
def handle_event(:user_created, %Event{event_id: id, data: data}) do
# Emit child events, linking them to parent
Events.emit(:send_welcome_email, data, causation_id: id)
Events.emit(:create_user_profile, data, causation_id: id)
endidempotency_key - Job Deduplication
- Generated: Unique per handler job (UUIDv7)
- Use: Outbox pattern, prevent duplicate processing on retry
def handle_event(:send_email, %Event{idempotency_key: key, data: data}) do
# Atomic insert - if key exists, we already processed this
OutboxEmail.insert(%{
idempotency_key: key,
user_id: data["user_id"],
status: :pending
}, on_conflict: :nothing, conflict_target: :idempotency_key)
endcausation_id - Event Chains (Optional)
- Provided: User passes parent's
event_idwhen emitting - Use: Build audit trails, understand "why did this happen?"
# Event chain:
user_registered
event_id: "01933b7e-1111-...", causation_id: nil
└─> send_welcome_email
event_id: "01933b7e-2222-...", causation_id: "01933b7e-1111-..."
└─> email_delivered
event_id: "01933b7e-3333-...", causation_id: "01933b7e-2222-..."correlation_id - Business Operation Grouping (Optional)
- Provided: User generates and passes to related emits
- Use: Group all events from a single business operation
# User upgrades subscription - many events happen
correlation_id = UUIDv7.generate()
Events.emit(:subscription_upgraded, data, correlation_id: correlation_id)
Events.emit(:old_subscription_cancelled, data, correlation_id: correlation_id)
Events.emit(:payment_processed, data, correlation_id: correlation_id)
Events.emit(:invoice_generated, data, correlation_id: correlation_id)
# In logs: filter by correlation_id to see all related eventsWhen to use what
| Metadata | Always Auto-Generated | User Can Override | Primary Use Case |
|---|---|---|---|
data | No | Yes (required) | Your event payload |
event_id | Yes | No | Event causation tracking |
idempotency_key | Yes | No | Prevent duplicate processing |
causation_id | No | Yes (optional) | Build event chains |
correlation_id | No | Yes (optional) | Group business operations |
Configuration options
Global configuration
Configure the Oban instance and default job options:
defmodule MyApp.Events do
# Simple: just specify the Oban instance (uses all defaults)
use ObanEvents, oban: MyApp.Oban
# Or: specify Oban instance + custom options
use ObanEvents,
oban: {MyApp.Oban, queue: :my_events, max_attempts: 5, priority: 1, tags: ["myapp"]}
@events %{
# ...
}
endDefault Oban options:
queue::oban_eventsmax_attempts:3priority:2(0-9, lower is higher priority)tags:[]
Per-handler configuration
Override global options for specific handlers using tuple syntax:
@events %{
user_created: [
# Critical handler with higher priority and more retries
{MyApp.EmailHandler, oban: [priority: 0, max_attempts: 10, tags: ["critical", "email"]]},
# Low-priority handler in different queue
{MyApp.AnalyticsHandler, oban: [queue: :analytics, priority: 3]},
# Delayed handler - send reminder email after 24 hours
{MyApp.ReminderHandler, oban: [schedule_in: {24, :hours}]},
# Handler using all global defaults
MyApp.NotificationHandler
]
}Per-handler options:
Any Oban.Job.new/2 option can be passed under the :oban key. Common options:
queue- Override queue (atom)max_attempts- Override retry count (integer)priority- Override priority (0-9, lower is higher)tags- Override tags (list of strings)scheduled_at- Schedule for future execution (DateTime)schedule_in- Delay execution (integer seconds or{n, :unit}tuple)meta- Additional job metadata (map)unique- Uniqueness constraints (keyword list)
API
Your events module provides these functions:
emit/2 and emit/3
Emit an event to all registered handlers, with optional metadata in the third argument.
@spec emit(atom(), map()) :: {:ok, [Oban.Job.t()]}
@spec emit(atom(), map(), keyword()) :: {:ok, [Oban.Job.t()]}
# Raises ArgumentError if event is not registered
MyApp.Events.emit(:user_created, %{user_id: 123, email: "user@example.com"})
MyApp.Events.emit(:user_created, %{user_id: 123}, correlation_id: UUIDv7.generate())get_handlers!/1
Get all handlers registered for an event.
@spec get_handlers!(atom()) :: [module()]
MyApp.Events.get_handlers!(:user_created)
# => [MyApp.EmailHandler, MyApp.AnalyticsHandler]all_events/0
List all registered event names.
@spec all_events() :: [atom()]
MyApp.Events.all_events()
# => [:user_created, :user_updated, :order_placed]registered?/1
Check if an event is registered.
@spec registered?(atom()) :: boolean()
MyApp.Events.registered?(:user_created)
# => trueHandler implementation
Handlers must implement the handle_event/2 callback:
defmodule MyApp.AnalyticsHandler do
use ObanEvents.Handler
@impl true
def handle_event(:user_created, %Event{data: data}) do
%{"user_id" => user_id} = data
MyApp.Analytics.track("User Created", user_id: user_id)
:ok
end
@impl true
def handle_event(:user_updated, %Event{data: data}) do
%{"user_id" => user_id, "changes" => changes} = data
MyApp.Analytics.track("User Updated", user_id: user_id, changes: changes)
:ok
end
endReturn values
Handlers should return:
:ok- Event processed successfully{:ok, result}- Event processed successfully with a result{:error, reason}- Event processing failed (will trigger Oban retry)
Handlers may also raise exceptions, which will trigger Oban's retry mechanism.
Best practices
1. Always Use Transactions
Emit events within transactions to ensure atomicity:
Repo.transact(fn ->
with {:ok, user} <- Repo.insert(changeset),
{:ok, _jobs} <- Events.emit(:user_created, %{user_id: user.id}) do
{:ok, user}
end
end)
# If insert fails, emit never happens ✅
# If emit fails, transaction rolls back ✅2. Make Handlers Idempotent
Critical: Handlers may be executed multiple times due to:
- Oban retries on failure
- Application restarts during processing
- Infrastructure issues (database failover, network timeouts)
Design handlers to be safe when run multiple times with the same event:
# Example 1: Outbox pattern for external services (RECOMMENDED)
def handle_event(:send_welcome_email, %Event{
data: data,
idempotency_key: key,
event_id: trace_id
}) do
%{"user_id" => user_id, "email" => email} = data
# Write to outbox table with idempotency_key
# Separate worker processes the outbox
OutboxEmail.insert(%{
idempotency_key: key, # Prevents duplicates on retry
trace_id: trace_id, # For tracing/correlation
user_id: user_id,
email: email,
template: :welcome,
status: :pending
}, on_conflict: :nothing, conflict_target: :idempotency_key)
:ok
end
# Example 2: Database operations - use upserts
def handle_event(:create_user_profile, %Event{data: data}) do
%{"user_id" => user_id} = data
%UserProfile{user_id: user_id}
|> Repo.insert(
on_conflict: :nothing,
conflict_target: :user_id
)
:ok
end
# Example 3: Direct external API call (use event_id for tracing)
# Note: Prefer outbox pattern for critical operations
def handle_event(:track_analytics, %Event{
data: data,
event_id: trace_id,
idempotency_key: key
}) do
# Pass event_id as trace/correlation ID to external service
AnalyticsAPI.track(
event: "user_signup",
properties: data,
trace_id: trace_id, # Links logs across systems
idempotency_key: key # Service can deduplicate
)
:ok
end
# Example 4: Multiple database changes - use Ecto.Multi
def handle_event(:subscription_upgraded, %Event{data: data}) do
%{"user_id" => user_id, "new_plan" => new_plan, "old_plan" => old_plan} = data
# Use Ecto.Multi for multiple related operations
Multi.new()
|> Multi.run(:upgrade, fn _repo, _changes ->
Subscriptions.upgrade(user_id, new_plan)
end)
|> Multi.run(:credits, fn _repo, _changes ->
Credits.add_bonus(user_id, new_plan)
end)
|> Multi.insert(:audit, Audit.changeset(%{
user_id: user_id,
old_plan: old_plan,
new_plan: new_plan
}))
|> Repo.transaction()
|> case do
{:ok, _} -> :ok
{:error, _failed_operation, _failed_value, _changes} ->
{:error, :subscription_upgrade_failed}
end
end
# Example 5: Increment operations - use atomic updates
def handle_event(:page_view, %Event{data: %{"page_id" => page_id}}) do
# Atomic operation - naturally idempotent
Repo.update_all(
from(p in Page, where: p.id == ^page_id),
inc: [view_count: 1]
)
:ok
end3. Include All Necessary Data
Don't rely on database lookups for data that might change:
# Good: Include all data needed
Events.emit(:status_changed, %{
record_id: record.id,
old_status: old_status,
new_status: record.status
})
# Bad: Handler has to query DB (old_status might be wrong)
Events.emit(:status_changed, %{
record_id: record.id
})4. Use JSON-Serializable Data
Event data must be JSON-serializable (no PIDs, refs, or functions):
# Good
Events.emit(:user_created, %{
user_id: user.id,
email: user.email,
amount: Decimal.to_string(user.balance)
})
# Bad: Full struct is not reliably serializable
Events.emit(:user_created, %{user: user})5. Return Errors for Retriable Failures
def handle_event(:send_notification, %Event{data: data}) do
case NotificationService.send(data) do
{:ok, _} -> :ok
{:error, :rate_limited} -> {:error, :rate_limited} # Will retry
{:error, :invalid_data} -> :ok # Don't retry invalid data
end
end6. Handle permanently failed jobs
When a handler exhausts all retry attempts, you may want to take action like alerting, logging to a dead letter queue, or notifying administrators. Override handle_exhausted/4 in your events module:
defmodule MyApp.Events do
use ObanEvents, oban: MyApp.Oban
@events %{
user_created: [MyApp.EmailHandler]
}
# Called when a handler fails after max_attempts
def handle_exhausted(event_name, handler_module, event, error) do
# Log to your error tracking service
Sentry.capture_message("Event handler permanently failed",
extra: %{
event: event_name,
handler: handler_module,
event_id: event.event_id,
error: error
}
)
# Store in dead letter queue for manual review
DeadLetterQueue.insert(%{
event_name: event_name,
handler: handler_module,
event_data: event.data,
event_id: event.event_id,
error: inspect(error),
failed_at: DateTime.utc_now()
})
:ok
end
endDefault behavior:
If you don't override handle_exhausted/4, the worker will log a warning with the event details. This ensures failures are always logged even without custom handling.
The callback receives:
event_name- The event that failed (atom)handler_module- The handler that failed (module)event- The full Event struct with all metadataerror- The error reason from the final attempt
Handler management
Renaming or removing handler modules
Handler module names are serialized to the database as fully-qualified Elixir module atoms (e.g., "Elixir.MyApp.EmailHandler"). When you rename or remove a handler module, existing queued jobs will still reference the old module name and will fail when Oban tries to execute them.
Safe migration strategy:
Use a module alias/stub to maintain backward compatibility during the transition:
Example 1: Renaming a handler
# After renaming MyApp.EmailHandler to MyApp.Notifications.EmailHandler
# 1. Create the new module with your desired name
defmodule MyApp.Notifications.EmailHandler do
use ObanEvents.Handler
@impl true
def handle_event(:user_created, %Event{data: data}) do
# Your handler logic
:ok
end
end
# 2. Keep the old module as an alias
defmodule MyApp.EmailHandler do
@moduledoc false
defdelegate handle_event(event, event_struct), to: MyApp.Notifications.EmailHandler
end
# 3. Update your @events to use the new name
defmodule MyApp.Events do
use ObanEvents
@events %{
user_created: [
MyApp.Notifications.EmailHandler # New jobs use new name
]
}
endExample 2: Removing a handler
# Removing MyApp.OldFeatureHandler
# 1. Remove from @events (new jobs won't be created)
defmodule MyApp.Events do
use ObanEvents
@events %{
user_created: [
# MyApp.OldFeatureHandler removed
MyApp.EmailHandler
]
}
end
# 2. Keep a stub module to handle old queued jobs gracefully
defmodule MyApp.OldFeatureHandler do
use ObanEvents.Handler
@impl true
def handle_event(_event, _event_struct) do
# Either process gracefully or just return :ok to discard
:ok
end
endHow this works:
- Existing queued jobs continue to work (either delegated or gracefully handled)
- New jobs use the updated handler list
- Zero downtime - no jobs fail during the transition
Cleanup:
After all old jobs have processed (check Oban Web UI), you can safely remove the alias/stub module. This typically takes as long as your retry window (default: a few hours with exponential backoff).
Testing
Testing event emission
use Oban.Testing, repo: MyApp.Repo
test "emits user_created event" do
{:ok, user} = Accounts.create_user(%{email: "test@example.com"})
assert_enqueued(
worker: ObanEvents.DispatchWorker,
args: %{
"event_name" => "user_created",
"handler_module" => "Elixir.MyApp.EmailHandler",
"data" => %{"user_id" => user.id}
}
)
endTesting handlers
Use ObanEvents.Testing to easily create Event structs for testing:
defmodule MyApp.EmailHandlerTest do
use ExUnit.Case
import ObanEvents.Testing
test "sends welcome email" do
event = build_event(%{"user_id" => 123, "email" => "test@example.com"})
assert :ok = MyApp.EmailHandler.handle_event(:user_created, event)
assert_email_sent(to: "test@example.com", subject: "Welcome!")
end
test "with custom metadata" do
event = build_event(
%{"user_id" => 123},
causation_id: "parent-event-id",
correlation_id: "test-correlation"
)
assert :ok = MyApp.EmailHandler.handle_event(:user_created, event)
end
endTesting with Oban inline mode
For integration tests, configure Oban to execute jobs inline:
# config/test.exs
config :my_app, Oban,
testing: :inline,
queues: false,
plugins: false
# In test
test "creates user and sends email" do
{:ok, user} = Accounts.create_user(%{email: "test@example.com"})
# Email sent immediately in test mode
assert_email_sent(to: "test@example.com")
endObservability
Oban Web UI
View event processing history, errors, and retries:
# In router.ex (development only)
import Phoenix.LiveDashboard.Router
scope "/dev" do
pipe_through :browser
live_dashboard "/dashboard", metrics: MyAppWeb.Telemetry
forward "/oban", Oban.Web.Router
endLogging
ObanEvents logs successful event processing at debug level and failures at error level:
[debug] Processing event: user_created with handler: MyApp.EmailHandler
[debug] Event processed successfully: user_created by MyApp.EmailHandler
[error] Event handler failed: user_created by MyApp.EmailHandler, error: :network_timeoutTo enable debug logs at runtime use:
iex> Logger.configure(level: :debug)Troubleshooting
Events not processing
Check Oban queue configuration:
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [
events: 10 # Make sure your queue is configured
]Check job status in database:
SELECT * FROM oban_jobs
WHERE queue = 'events'
ORDER BY inserted_at DESC
LIMIT 10;Events not emitted
Verify event is registered:
iex> MyApp.Events.registered?(:user_created)
true
iex> MyApp.Events.all_events()
[:user_created, :user_updated, ...]Check transaction succeeded:
Add logging to verify the transaction completes:
Repo.transact(fn ->
with {:ok, user} <- Repo.insert(changeset),
{:ok, jobs} <- Events.emit(:user_created, data) do
Logger.info("Emitted #{length(jobs)} jobs")
{:ok, user}
end
end)Handler failures
View errors in Oban Web UI at /dev/oban
Check application logs for handler errors
Manually retry failed job:
iex> job = Oban.Job |> Repo.get(job_id)
iex> Oban.retry_job(job)Examples
See the repository test suite for complete examples of:
- Event emission
- Handler implementation
- Transaction behavior
- Testing patterns
Disclaimer
ObanEvents is an independent, community-built library and is not an official Oban project.
Credits
Built with Oban by Parker Selbert.