ObanEvents

A lightweight, persistent event handling library for Elixir applications built on top of Oban.

Features

Installation

Add oban_events to your list of dependencies in mix.exs:

def deps do
  [
    {:oban_events, "~> 1.0"},
    {:oban, "~> 2.0"}
  ]
end

Quick Start

1. Define Your Events

Create a module that uses ObanEvents, pass your Oban instance and optional global job config (defaults: queue: :oban_events, max_attempts: 3, priority: 2), then define your events and handlers:

defmodule MyApp.Events do
  use ObanEvents,
    oban: {MyApp.Oban, queue: :myapp_events, max_attempts: 3}

  @events %{
    user_created: [
      {MyApp.EmailHandler, oban: [priority: 0, max_attempts: 5, tags: ["critical"]]},
      MyApp.AnalyticsHandler
    ],
    user_updated: [MyApp.CacheHandler],
    order_placed: [MyApp.NotificationHandler]
  }
end

2. Create Event Handlers

Implement the ObanEvents.Handler behaviour. Handlers receive an Event struct with your data plus metadata:

defmodule MyApp.EmailHandler do
  use ObanEvents.Handler

  require Logger

  @impl true
  def handle_event(:user_created, %Event{data: data, idempotency_key: key}) do
    %{"user_id" => user_id, "email" => email} = data

    # Use idempotency_key for outbox pattern
    case OutboxEmail.insert(%{
      idempotency_key: key,
      user_id: user_id,
      email: email,
      template: :welcome,
      status: :pending
    }, on_conflict: :nothing, conflict_target: :idempotency_key) do
      {:ok, _} ->
        Logger.info("Welcome email queued for #{email}")
        :ok

      {:error, _} ->
        Logger.info("Welcome email already queued for #{email}")
        :ok
    end
  end
end

3. Emit Events

Emit events from your application code, preferably within transactions:

defmodule MyApp.Accounts do
  alias MyApp.{Repo, Events}

  def create_user(attrs) do
    Repo.transact(fn ->
      with {:ok, user} <- Repo.insert(changeset),
           {:ok, _jobs} <- Events.emit(:user_created, %{
             user_id: user.id,
             email: user.email
           }) do
        {:ok, user}
      end
    end)
  end
end

How it works

flowchart TD
    A[Business Logic] -->|1. emit event + data| B[Events.emit]
    B -->|2. lookup handlers| C[Create Oban jobs]
    C -->|3. persist dispatch jobs| D[Oban processes jobs]
    D -->|4. dispatch| E[EmailHandler]
    D -->|4. dispatch| F[AnalyticsHandler]

Event Metadata

Handlers receive an ObanEvents.Event struct containing your data plus metadata for deduplication, tracing, and correlation:

%Event{
  data: %{"user_id" => 123},                                    # Your event data
  event_id: "01933b7e-8a3f-7f6f-9e42-6c8f3a0b2d1e",           # Unique per emit
  idempotency_key: "01933b7e-9f2c-7a1b-8d4e-3c5f6a7b8c9d",    # Unique per job
  causation_id: "01933b7e-7f4a-7c2d-9b3e-4d5f6a7b8c9d",       # Optional: parent event_id
  correlation_id: "01933b7e-6e3b-7d5f-8c4e-5d6f7a8b9c0d"      # Optional: business operation grouping
}

Metadata Fields

event_id - Emit Identifier

def handle_event(:user_created, %Event{event_id: id, data: data}) do
  # Emit child events, linking them to parent
  Events.emit(:send_welcome_email, data, causation_id: id)
  Events.emit(:create_user_profile, data, causation_id: id)
end

idempotency_key - Job Deduplication

def handle_event(:send_email, %Event{idempotency_key: key, data: data}) do
  # Atomic insert - if key exists, we already processed this
  OutboxEmail.insert(%{
    idempotency_key: key,
    user_id: data["user_id"],
    status: :pending
  }, on_conflict: :nothing, conflict_target: :idempotency_key)
end

causation_id - Event Chains (Optional)

# Event chain:
user_registered
  event_id: "01933b7e-1111-...", causation_id: nil
  └─> send_welcome_email
        event_id: "01933b7e-2222-...", causation_id: "01933b7e-1111-..."
      └─> email_delivered
            event_id: "01933b7e-3333-...", causation_id: "01933b7e-2222-..."

correlation_id - Business Operation Grouping (Optional)

# User upgrades subscription - many events happen
correlation_id = UUIDv7.generate()

Events.emit(:subscription_upgraded, data, correlation_id: correlation_id)
Events.emit(:old_subscription_cancelled, data, correlation_id: correlation_id)
Events.emit(:payment_processed, data, correlation_id: correlation_id)
Events.emit(:invoice_generated, data, correlation_id: correlation_id)

# In logs: filter by correlation_id to see all related events

When to use what

Metadata Always Auto-Generated User Can Override Primary Use Case
data No Yes (required) Your event payload
event_id Yes No Event causation tracking
idempotency_key Yes No Prevent duplicate processing
causation_id No Yes (optional) Build event chains
correlation_id No Yes (optional) Group business operations

Configuration options

Global configuration

Configure the Oban instance and default job options:

defmodule MyApp.Events do
  # Simple: just specify the Oban instance (uses all defaults)
  use ObanEvents, oban: MyApp.Oban

  # Or: specify Oban instance + custom options
  use ObanEvents,
    oban: {MyApp.Oban, queue: :my_events, max_attempts: 5, priority: 1, tags: ["myapp"]}

  @events %{
    # ...
  }
end

Default Oban options:

Per-handler configuration

Override global options for specific handlers using tuple syntax:

@events %{
  user_created: [
    # Critical handler with higher priority and more retries
    {MyApp.EmailHandler, oban: [priority: 0, max_attempts: 10, tags: ["critical", "email"]]},

    # Low-priority handler in different queue
    {MyApp.AnalyticsHandler, oban: [queue: :analytics, priority: 3]},

    # Delayed handler - send reminder email after 24 hours
    {MyApp.ReminderHandler, oban: [schedule_in: {24, :hours}]},

    # Handler using all global defaults
    MyApp.NotificationHandler
  ]
}

Per-handler options:

Any Oban.Job.new/2 option can be passed under the :oban key. Common options:

API

Your events module provides these functions:

emit/2 and emit/3

Emit an event to all registered handlers, with optional metadata in the third argument.

@spec emit(atom(), map()) :: {:ok, [Oban.Job.t()]}
@spec emit(atom(), map(), keyword()) :: {:ok, [Oban.Job.t()]}

# Raises ArgumentError if event is not registered
MyApp.Events.emit(:user_created, %{user_id: 123, email: "user@example.com"})
MyApp.Events.emit(:user_created, %{user_id: 123}, correlation_id: UUIDv7.generate())

get_handlers!/1

Get all handlers registered for an event.

@spec get_handlers!(atom()) :: [module()]

MyApp.Events.get_handlers!(:user_created)
# => [MyApp.EmailHandler, MyApp.AnalyticsHandler]

all_events/0

List all registered event names.

@spec all_events() :: [atom()]

MyApp.Events.all_events()
# => [:user_created, :user_updated, :order_placed]

registered?/1

Check if an event is registered.

@spec registered?(atom()) :: boolean()

MyApp.Events.registered?(:user_created)
# => true

Handler implementation

Handlers must implement the handle_event/2 callback:

defmodule MyApp.AnalyticsHandler do
  use ObanEvents.Handler

  @impl true
  def handle_event(:user_created, %Event{data: data}) do
    %{"user_id" => user_id} = data
    MyApp.Analytics.track("User Created", user_id: user_id)
    :ok
  end

  @impl true
  def handle_event(:user_updated, %Event{data: data}) do
    %{"user_id" => user_id, "changes" => changes} = data
    MyApp.Analytics.track("User Updated", user_id: user_id, changes: changes)
    :ok
  end
end

Return values

Handlers should return:

Handlers may also raise exceptions, which will trigger Oban's retry mechanism.

Best practices

1. Always Use Transactions

Emit events within transactions to ensure atomicity:

Repo.transact(fn ->
  with {:ok, user} <- Repo.insert(changeset),
       {:ok, _jobs} <- Events.emit(:user_created, %{user_id: user.id}) do
    {:ok, user}
  end
end)
# If insert fails, emit never happens ✅
# If emit fails, transaction rolls back ✅

2. Make Handlers Idempotent

Critical: Handlers may be executed multiple times due to:

Design handlers to be safe when run multiple times with the same event:

# Example 1: Outbox pattern for external services (RECOMMENDED)
def handle_event(:send_welcome_email, %Event{
  data: data,
  idempotency_key: key,
  event_id: trace_id
}) do
  %{"user_id" => user_id, "email" => email} = data

  # Write to outbox table with idempotency_key
  # Separate worker processes the outbox
  OutboxEmail.insert(%{
    idempotency_key: key,          # Prevents duplicates on retry
    trace_id: trace_id,             # For tracing/correlation
    user_id: user_id,
    email: email,
    template: :welcome,
    status: :pending
  }, on_conflict: :nothing, conflict_target: :idempotency_key)

  :ok
end

# Example 2: Database operations - use upserts
def handle_event(:create_user_profile, %Event{data: data}) do
  %{"user_id" => user_id} = data

  %UserProfile{user_id: user_id}
  |> Repo.insert(
    on_conflict: :nothing,
    conflict_target: :user_id
  )

  :ok
end

# Example 3: Direct external API call (use event_id for tracing)
# Note: Prefer outbox pattern for critical operations
def handle_event(:track_analytics, %Event{
  data: data,
  event_id: trace_id,
  idempotency_key: key
}) do
  # Pass event_id as trace/correlation ID to external service
  AnalyticsAPI.track(
    event: "user_signup",
    properties: data,
    trace_id: trace_id,           # Links logs across systems
    idempotency_key: key          # Service can deduplicate
  )

  :ok
end

# Example 4: Multiple database changes - use Ecto.Multi
def handle_event(:subscription_upgraded, %Event{data: data}) do
  %{"user_id" => user_id, "new_plan" => new_plan, "old_plan" => old_plan} = data

  # Use Ecto.Multi for multiple related operations
  Multi.new()
  |> Multi.run(:upgrade, fn _repo, _changes ->
    Subscriptions.upgrade(user_id, new_plan)
  end)
  |> Multi.run(:credits, fn _repo, _changes ->
    Credits.add_bonus(user_id, new_plan)
  end)
  |> Multi.insert(:audit, Audit.changeset(%{
    user_id: user_id,
    old_plan: old_plan,
    new_plan: new_plan
  }))
  |> Repo.transaction()
  |> case do
    {:ok, _} -> :ok
    {:error, _failed_operation, _failed_value, _changes} ->
      {:error, :subscription_upgrade_failed}
  end
end

# Example 5: Increment operations - use atomic updates
def handle_event(:page_view, %Event{data: %{"page_id" => page_id}}) do
  # Atomic operation - naturally idempotent
  Repo.update_all(
    from(p in Page, where: p.id == ^page_id),
    inc: [view_count: 1]
  )

  :ok
end

3. Include All Necessary Data

Don't rely on database lookups for data that might change:

# Good: Include all data needed
Events.emit(:status_changed, %{
  record_id: record.id,
  old_status: old_status,
  new_status: record.status
})

# Bad: Handler has to query DB (old_status might be wrong)
Events.emit(:status_changed, %{
  record_id: record.id
})

4. Use JSON-Serializable Data

Event data must be JSON-serializable (no PIDs, refs, or functions):

# Good
Events.emit(:user_created, %{
  user_id: user.id,
  email: user.email,
  amount: Decimal.to_string(user.balance)
})

# Bad: Full struct is not reliably serializable
Events.emit(:user_created, %{user: user})

5. Return Errors for Retriable Failures

def handle_event(:send_notification, %Event{data: data}) do
  case NotificationService.send(data) do
    {:ok, _} -> :ok
    {:error, :rate_limited} -> {:error, :rate_limited}  # Will retry
    {:error, :invalid_data} -> :ok  # Don&#39;t retry invalid data
  end
end

6. Handle permanently failed jobs

When a handler exhausts all retry attempts, you may want to take action like alerting, logging to a dead letter queue, or notifying administrators. Override handle_exhausted/4 in your events module:

defmodule MyApp.Events do
  use ObanEvents, oban: MyApp.Oban

  @events %{
    user_created: [MyApp.EmailHandler]
  }

  # Called when a handler fails after max_attempts
  def handle_exhausted(event_name, handler_module, event, error) do
    # Log to your error tracking service
    Sentry.capture_message("Event handler permanently failed",
      extra: %{
        event: event_name,
        handler: handler_module,
        event_id: event.event_id,
        error: error
      }
    )

    # Store in dead letter queue for manual review
    DeadLetterQueue.insert(%{
      event_name: event_name,
      handler: handler_module,
      event_data: event.data,
      event_id: event.event_id,
      error: inspect(error),
      failed_at: DateTime.utc_now()
    })

    :ok
  end
end

Default behavior:

If you don't override handle_exhausted/4, the worker will log a warning with the event details. This ensures failures are always logged even without custom handling.

The callback receives:

Handler management

Renaming or removing handler modules

Handler module names are serialized to the database as fully-qualified Elixir module atoms (e.g., "Elixir.MyApp.EmailHandler"). When you rename or remove a handler module, existing queued jobs will still reference the old module name and will fail when Oban tries to execute them.

Safe migration strategy:

Use a module alias/stub to maintain backward compatibility during the transition:

Example 1: Renaming a handler

# After renaming MyApp.EmailHandler to MyApp.Notifications.EmailHandler

# 1. Create the new module with your desired name
defmodule MyApp.Notifications.EmailHandler do
  use ObanEvents.Handler

  @impl true
  def handle_event(:user_created, %Event{data: data}) do
    # Your handler logic
    :ok
  end
end

# 2. Keep the old module as an alias
defmodule MyApp.EmailHandler do
  @moduledoc false
  defdelegate handle_event(event, event_struct), to: MyApp.Notifications.EmailHandler
end

# 3. Update your @events to use the new name
defmodule MyApp.Events do
  use ObanEvents

  @events %{
    user_created: [
      MyApp.Notifications.EmailHandler  # New jobs use new name
    ]
  }
end

Example 2: Removing a handler

# Removing MyApp.OldFeatureHandler

# 1. Remove from @events (new jobs won&#39;t be created)
defmodule MyApp.Events do
  use ObanEvents

  @events %{
    user_created: [
      # MyApp.OldFeatureHandler removed
      MyApp.EmailHandler
    ]
  }
end

# 2. Keep a stub module to handle old queued jobs gracefully
defmodule MyApp.OldFeatureHandler do
  use ObanEvents.Handler

  @impl true
  def handle_event(_event, _event_struct) do
    # Either process gracefully or just return :ok to discard
    :ok
  end
end

How this works:

  1. Existing queued jobs continue to work (either delegated or gracefully handled)
  2. New jobs use the updated handler list
  3. Zero downtime - no jobs fail during the transition

Cleanup:

After all old jobs have processed (check Oban Web UI), you can safely remove the alias/stub module. This typically takes as long as your retry window (default: a few hours with exponential backoff).

Testing

Testing event emission

use Oban.Testing, repo: MyApp.Repo

test "emits user_created event" do
  {:ok, user} = Accounts.create_user(%{email: "test@example.com"})

  assert_enqueued(
    worker: ObanEvents.DispatchWorker,
    args: %{
      "event_name" => "user_created",
      "handler_module" => "Elixir.MyApp.EmailHandler",
      "data" => %{"user_id" => user.id}
    }
  )
end

Testing handlers

Use ObanEvents.Testing to easily create Event structs for testing:

defmodule MyApp.EmailHandlerTest do
  use ExUnit.Case
  import ObanEvents.Testing

  test "sends welcome email" do
    event = build_event(%{"user_id" => 123, "email" => "test@example.com"})

    assert :ok = MyApp.EmailHandler.handle_event(:user_created, event)
    assert_email_sent(to: "test@example.com", subject: "Welcome!")
  end

  test "with custom metadata" do
    event = build_event(
      %{"user_id" => 123},
      causation_id: "parent-event-id",
      correlation_id: "test-correlation"
    )

    assert :ok = MyApp.EmailHandler.handle_event(:user_created, event)
  end
end

Testing with Oban inline mode

For integration tests, configure Oban to execute jobs inline:

# config/test.exs
config :my_app, Oban,
  testing: :inline,
  queues: false,
  plugins: false

# In test
test "creates user and sends email" do
  {:ok, user} = Accounts.create_user(%{email: "test@example.com"})

  # Email sent immediately in test mode
  assert_email_sent(to: "test@example.com")
end

Observability

Oban Web UI

View event processing history, errors, and retries:

# In router.ex (development only)
import Phoenix.LiveDashboard.Router

scope "/dev" do
  pipe_through :browser
  live_dashboard "/dashboard", metrics: MyAppWeb.Telemetry

  forward "/oban", Oban.Web.Router
end

Logging

ObanEvents logs successful event processing at debug level and failures at error level:

[debug] Processing event: user_created with handler: MyApp.EmailHandler
[debug] Event processed successfully: user_created by MyApp.EmailHandler
[error] Event handler failed: user_created by MyApp.EmailHandler, error: :network_timeout

To enable debug logs at runtime use:

iex> Logger.configure(level: :debug)

Troubleshooting

Events not processing

Check Oban queue configuration:

# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  queues: [
    events: 10  # Make sure your queue is configured
  ]

Check job status in database:

SELECT * FROM oban_jobs
WHERE queue = &#39;events&#39;
ORDER BY inserted_at DESC
LIMIT 10;

Events not emitted

Verify event is registered:

iex> MyApp.Events.registered?(:user_created)
true

iex> MyApp.Events.all_events()
[:user_created, :user_updated, ...]

Check transaction succeeded:

Add logging to verify the transaction completes:

Repo.transact(fn ->
  with {:ok, user} <- Repo.insert(changeset),
       {:ok, jobs} <- Events.emit(:user_created, data) do
    Logger.info("Emitted #{length(jobs)} jobs")
    {:ok, user}
  end
end)

Handler failures

View errors in Oban Web UI at /dev/oban

Check application logs for handler errors

Manually retry failed job:

iex> job = Oban.Job |> Repo.get(job_id)
iex> Oban.retry_job(job)

Examples

See the repository test suite for complete examples of:

Disclaimer

ObanEvents is an independent, community-built library and is not an official Oban project.

Credits

Built with Oban by Parker Selbert.