ExEventBus

ExEventBus provides an event bus that uses the outbox pattern. Behind the scenes, it relies on Oban and ConCache.

Installation

If available in Hex, the package can be installed by adding ex_event_bus to your list of dependencies in mix.exs:

def deps do
  [
    {:ex_event_bus, "~> 0.10.0"}
  ]
end

Run tests

# run only once to setup the test DB
MIX_ENV=test mix test.setup

# actually run the tests
mix test

Setup

  1. Create a module that defines your event bus
  defmodule MyApp.EventBus do
    use ExEventBus, otp_app: :my_app
  end
  1. Add the required config for your EventBus, that is the Oban config
  config :my_app, MyApp.EventBus,
    oban: [
      engine: Oban.Engines.Basic,
      notifier: Oban.Notifiers.Postgres,
      repo: MyApp.Repo,
      plugins: [
        {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(60)},
        {Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 7}
      ],
      queues: [
        event_bus: 2
      ]
    ]
  1. Create your first events
  defmodule MyApp.Events do
    use ExEventBus.Event

    defevent(MyEvent)
  end
  1. Create your first event handler
  defmodule MyApp.EventHandler do
    use ExEventBus.EventHandler,
      event_bus: MyApp.EventBus,
      events: [MyApp.Events.MyEvent]

    @impl ExEventBus.EventHandler
    def handle_event(%MyApp.Events.MyEvent{aggregate: %{"id" => aggregate_id}}) do
      # ... handle the event here
    end
  end
  1. Add your event bus to your supervision tree
  # add the event bus to your application children 

  def start(_type, _args) do 
    # ... 

    children = [
      # ...
      MyApp.EventBus,
      # ...
    ]

    # ...
  1. Add your event handlers to your supervision tree
  def start(_type, _args) do
    # ...

    children = [
      # ...
      {MyApp.EventHandler, [event_bus: MyApp.EventBus]},
      # ...
    ]

    # ...
  end

Event Structure

Events published by ExEventBus contain:

How It Works

changes and initial_data directly mirror your Ecto changeset - only fields that actually changed are included:

# Simple update
%MyEvent{
  aggregate: %User{id: 1, name: "Jane Doe", email: "jane.doe@example.com"},
  changes: %{"email" => "jane.doe@example.com"},
  initial_data: %{"email" => "jane@example.com"},
  metadata: nil
}

INSERT Operations

%UserCreated{
  aggregate: %User{id: 1, name: "John", email: "john@example.com"},
  changes: %{
    "name" => "John",
    "email" => "john@example.com"
  },
  initial_data: %{
    "name" => nil,
    "email" => nil
  }
}

UPDATE Operations

%UserUpdated{
  aggregate: %User{id: 1, email: "new@example.com", age: 30},
  changes: %{
    "email" => "new@example.com",
    "age" => 30
  },
  initial_data: %{
    "email" => "old@example.com",
    "age" => 25
  }
}

DELETE Operations

%UserDeleted{
  aggregate: %User{id: 1, name: "John"},
  changes: %{},
  initial_data: %{}
}

Association Changes

Associations in changes and initial_data work the same way - they mirror what's in the changeset:

Creating with Associations

# Ecto operation
user_changeset = User.changeset(%User{}, %{
  name: "Alice",
  email: "alice@example.com",
  profile: %{bio: "Software Engineer"},
  posts: [
    %{title: "First Post", body: "Hello World"}
  ]
})

Repo.insert(user_changeset, success_event: UserCreated)

# Published event
%UserCreated{
  changes: %{
    "name" => "Alice",
    "email" => "alice@example.com",
    "profile" => %{"bio" => "Software Engineer"},
    "posts" => [
      %{"title" => "First Post", "body" => "Hello World"}
    ]
  },
  initial_data: %{
    "name" => nil,
    "email" => nil,
    "profile" => nil,
    "posts" => []
  }
}

Updating Associations

# Ecto operation
user = Repo.get(User, 1) |> Repo.preload([:profile, :posts])

user_changeset = User.changeset(user, %{
  profile: %{id: user.profile.id, bio: "Senior Engineer"},
  posts: [
    %{id: 10, title: "Updated Title"},
    %{title: "New Post", body: "New Content"}
  ]
})

Repo.update(user_changeset, success_event: UserUpdated)

# Published event
%UserUpdated{
  changes: %{
    "profile" => %{"bio" => "Senior Engineer"},
    "posts" => [
      %{"title" => "Updated Title"},
      %{"title" => "New Post", "body" => "New Content"}
    ]
  },
  initial_data: %{
    "profile" => %{"bio" => "Engineer"},
    "posts" => [
      %{"title" => "Original Title"},
      %{}  # New item - no previous value
    ]
  }
}

Using in Event Handlers

Simply check if fields exist in changes:

def handle_event(%UserUpdated{changes: changes, initial_data: initial_data}) do
  # React to email changes
  if Map.has_key?(changes, "email") do
    send_email_change_notification(
      old_email: initial_data["email"],
      new_email: changes["email"]
    )
  end

  # React to association changes
  if Map.has_key?(changes, "posts") do
    notify_posts_changed()
  end
end

Supported Field Types

ExEventBus fully supports all Ecto field types, including:

Primitive array fields (like field(:tags, {:array, :string}, default: [])) are properly tracked as field changes, while association arrays are tracked with individual item primary keys.

Usage with Ecto Operations

To publish events from Ecto operations, pass the event module using the :success_event option:

# Insert
Repo.insert(changeset, success_event: MyApp.Events.UserCreated)

# Update
Repo.update(changeset, success_event: MyApp.Events.UserUpdated)

# Delete
Repo.delete(user, success_event: MyApp.Events.UserDeleted)

The event is only published if the operation succeeds.