WalEx

Postgres Change Data Capture (CDC) in Elixir.

WalEx allows you to listen to change events on your Postgres tables then perform callback-like actions with the data. For example:

You can learn more about CDC and what you can do with it here: Why capture changes?

Credit

This library steals liberally from realtime from Supabase, which in turn draws heavily on cainophile.

Installation

If available in Hex, the package can be installed by adding walex to your list of dependencies in mix.exs:

def deps do
  [
    {:walex, "~> 3.0.4"}
  ]
end

PostgreSQL Configuration

Logical Replication

WalEx only supports PostgreSQL. To get started, you first need to configure PostgreSQL for logical replication:

ALTER SYSTEM SET wal_level = 'logical';

Docker Compose:

command: [ "postgres", "-c", "wal_level=logical" ]

Publication

When you change the wal_level variable, you'll need to restart your PostgreSQL server. Once you've restarted, go ahead and create a publication for the tables you want to receive changes for:

All tables:

CREATE PUBLICATION events FOR ALL TABLES;

Or just specific tables:

CREATE PUBLICATION events FOR TABLE user, todo;

Filter based on row conditions (Postgres v15+ only):

CREATE PUBLICATION user_event FOR TABLE user WHERE (active IS TRUE);

Replica Identity

WalEx supports all of the settings for REPLICA IDENTITY. Use FULL if you can use it, as it will make tracking differences easier as the old data will be sent alongside the new data. You'll need to set this for each table.

Specific tables:

ALTER TABLE user REPLICA IDENTITY FULL;
ALTER TABLE todo REPLICA IDENTITY FULL;

Also, be mindful of replication gotchas.

AWS RDS

Amazon (AWS) RDS Postgres allows you to configure logical replication.

When creating a new Postgres database on RDS, you'll need to set a Parameter Group with the following settings:

rds.logical_replication = 1
max_replication_slots = 5
max_slot_wal_keep_size = 2048

Usage

Config

# config.exs

config :my_app, WalEx,
  hostname: "localhost",
  username: "postgres",
  password: "postgres",
  port: "5432",
  database: "postgres",
  publication: "events",
  subscriptions: [:user, :todo],
  # modules are optional; WalEx assumes your module names match
  # this pattern: MyApp.Events.User, MyApp.Events.ToDo, etc
  # but you can also specify custom modules like so:
  modules: [MyApp.CustomModule, MyApp.OtherCustomModule],
  name: MyApp

It is also possible to just define the URL configuration for the database

# config.exs

config :my_app, WalEx,
  url: "postgres://username:password@hostname:port/database"
  publication: "events",
  subscriptions: [:user, :todo],
  name: MyApp

You can also dynamically update the config at runtime:

WalEx.Configs.add_config(MyApp, :subscriptions, ["new_subscriptions_1", "new_subscriptions_2"])
WalEx.Configs.remove_config(MyApp, :subscriptions, "subscriptions")
WalEx.Configs.replace_config(MyApp, :password, "new_password")

Supervisor

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {WalEx.Supervisor, Application.get_env(:my_app, WalEx)}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Examples

If your app is named MyApp and you have a subscription called :user (which represents a database table), WalEx assumes you have a module called MyApp.Events.User that uses WalEx Event. But you can also define any custom module, just be sure to add it to the modules config.

Note that the result of the :ok tuple is a list. This is because WalEx returns a List of changes form a database transaction for a particular table. Often times this will just contain one result, but it could be many (for example, if you use database triggers to update a column after an insert).

DSL:

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  # any event
  on_event(:user, fn {:ok, users} ->
    IO.inspect(on_event: users)
    # do something with users data (Event Struct)
  end)

  on_insert(:user, fn {:ok, users} ->
    IO.inspect(on_insert: users)
  end)

  on_update(:user, fn {:ok, users} ->
    IO.inspect(on_update: users)
  end)

  on_delete(:user, fn {:ok, users} ->
    IO.inspect(on_delete: users)
  end)

Additional filter helpers available in the WalEx.TransactionFilter module.

Event

The returned data is a List of %Event{} Structs with changes provided by the map_diff library (UPDATE example where name field was changed):

[
  %Event{
    type: :update,
    # the new record
    record: %{
      id: 1234,
      name: "Chase Pursley",
      ...
    },
    old_record: %{
      id: 1234,
      name: "Chase",
      ...
    },
    # changes provided by the map_diff library,
    changes: %{
      name: %{
        added: "Chase Pursley",
        changed: :primitive_change,
        removed: "Chase"
      }
    },
    commit_timestamp: ~U[2023-12-06 14:32:49Z]
  }
]

Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/walex.