TdsCdc
Change Data Capture for SQL Server in Elixir.
TdsCdc captures row-level changes (INSERT, UPDATE, DELETE) from SQL Server tables with CDC enabled. It periodically polls CDC change tables and publishes events to subscribed processes.
Requirements
- Elixir ~> 1.18
- SQL Server 2016+ with CDC enabled
- SQL Server Agent (sqlagent) running (required by CDC)
Installation
Add to mix.exs:
def deps do
[
{:tds_cdc, "~> 0.1.0"}
]
endFor Ecto integration, also add:
def deps do
[
{:tds_cdc, "~> 0.1.0"},
{:ecto_sql, "~> 3.0"},
{:tds_ecto, "~> 2.3"}
]
endSQL Server Setup
Enable CDC on the database
USE my_database;
GO
EXEC sys.sp_cdc_enable_db;
GOEnable CDC on a table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'users',
@role_name = NULL; -- NULL = unrestricted access
GO
This creates a capture instance named dbo_users and a table cdc.dbo_users_CT where SQL Server stores the changes.
The capture instance name follows the pattern <schema>_<table> — so dbo.users becomes dbo_users. You use this name everywhere in TdsCdc: capture_instances: ["dbo_users"], TdsCdc.subscribe("dbo_users"), etc. You can also specify a custom name via @capture_instance when enabling CDC.
Warning — Schema changes are not propagated to CDC tables. If you add or remove columns from the source table (
dbo.users), the_CTtable will not reflect the new schema. You must either:
Disable and re-enable CDC — all historical change data is lost:
EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'users', @capture_instance = N'dbo_users'; EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'users', @role_name = NULL;Create a second capture instance — keeps the old one alive during transition:
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'users', @capture_instance = N'dbo_users_v2', @role_name = NULL;Then switch TdsCdc to
capture_instances: ["dbo_users_v2"]and disable the old instance when ready.
Connection adapters
TdsCdc supports two connection modes:
Option A: Direct TDS connection (default)
Manages its own connection pool. No external dependencies beyond tds.
{:ok, pid} = TdsCdc.start_link(
conn: [
hostname: "localhost",
port: 1433,
username: "sa",
password: "YourStrong!Passw0rd",
database: "my_database"
],
capture_instances: ["dbo_users"],
poll_interval: 1_000
)
The :conn options are forwarded to Tds.start_link/1. Additional pool options:
| Option | Default | Description |
|---|---|---|
:pool_size | 5 | Number of connections in the pool |
:ownership_timeout | 30000 | Max time (ms) a connection can be checked out |
:timeout | 30000 | Query timeout (ms) |
Option B: Ecto.Repo
Uses an existing Ecto.Repo for all queries. Shares the Repo's connection pool — no separate TDS connection needed.
# In your application.ex supervision tree:
children = [
MyApp.Repo,
{TdsCdc.Client, repo: MyApp.Repo, capture_instances: ["dbo_users"]}
]
# Or at runtime:
{:ok, pid} = TdsCdc.start_link(
repo: MyApp.Repo,
capture_instances: ["dbo_users"]
)Requirements: The Repo must use the TDS adapter (tds_ecto) and be started before TdsCdc.
Usage
Subscribe to changes
:ok = TdsCdc.subscribe("dbo_users")
receive do
{:tds_cdc_change, "dbo_users", %TdsCdc.Change{operation: :insert, data: %{id: 1, name: "Alice"}}} ->
IO.puts("New user: #{change.data.name}")
{:tds_cdc_change, "dbo_users", %TdsCdc.Change{operation: :update, data: %{id: 1, name: "Alice"}}} ->
IO.puts("User updated")
{:tds_cdc_change, "dbo_users", %TdsCdc.Change{operation: :delete, data: %{id: 2, name: "Bob"}}} ->
IO.puts("User deleted")
endUnsubscribe
:ok = TdsCdc.unsubscribe("dbo_users")Query current LSN position
The LSN (Log Sequence Number) position indicates how far the client has processed changes in the SQL Server transaction log. Use this to check which changes have already been delivered.
{:ok, lsn} = TdsCdc.current_lsn("dbo_users")Stop the client
Gracefully stops the CDC client process, cancels its polling timer, and (if using :conn) closes the database connection. Subscribers will no longer receive change events.
:ok = TdsCdc.stop()Check CDC status (utility functions)
These functions accept either a TDS connection pid or an Ecto.Repo module:
# With TDS connection
{:ok, conn} = Tds.start_link(conn_opts)
{:ok, true} = TdsCdc.cdc_enabled?(conn)
{:ok, ["dbo_users", "dbo_orders"]} = TdsCdc.list_capture_instances(conn)
GenServer.stop(conn)
# With Ecto.Repo
{:ok, true} = TdsCdc.cdc_enabled?(MyApp.Repo)
{:ok, ["dbo_users"]} = TdsCdc.list_capture_instances(MyApp.Repo)Wait for client to be ready
{:ok, pid} = TdsCdc.start_link(conn: conn_opts, capture_instances: ["dbo_users"])
:ok = TdsCdc.wait_for_ready(timeout: 10_000, capture_instance: "dbo_users")
TdsCdc.subscribe("dbo_users")LSN persistence
By default, TdsCdc persists LSN positions to disk so they survive application restarts. When the client starts, it loads saved positions and resumes from where it left off (provided the positions are still within the CDC retention window).
Default: file-based persistence
Positions are saved as JSON files in <system_tmp>/tds_cdc/<client_name>.json:
# Default (automatic)
TdsCdc.start_link(conn: [...], capture_instances: ["dbo_users"])
# Custom path
TdsCdc.start_link(
conn: [...],
capture_instances: ["dbo_users"],
persistence: {TdsCdc.Persistence.File, path: "/var/lib/myapp/lsn"}
)Custom persistence module
Implement the TdsCdc.Persistence behaviour to store positions in a database, Redis, or any other backend:
defmodule MyApp.DbPersistence do
@behaviour TdsCdc.Persistence
@impl true
def save_positions(_name, positions) do
# Write positions to your database
:ok
end
@impl true
def load_positions(name) do
# Read positions from your database
{:ok, %{}} # or {:error, :not_found}
end
end
TdsCdc.start_link(
conn: [...],
capture_instances: ["dbo_users"],
persistence: {MyApp.DbPersistence, []}
)Disable persistence
If you don't need positions to survive restarts:
TdsCdc.start_link(
conn: [...],
capture_instances: ["dbo_users"],
persistence: nil
)Multiple instances
You can run multiple clients with different configurations:
{:ok, pid_fast} = TdsCdc.start_link(
name: TdsCdc.Fast,
conn: [hostname: "localhost", ...],
capture_instances: ["dbo_users"],
poll_interval: 100
)
{:ok, pid_slow} = TdsCdc.start_link(
name: TdsCdc.Slow,
repo: MyApp.Repo,
capture_instances: ["dbo_orders"],
poll_interval: 5_000
)
TdsCdc.subscribe(TdsCdc.Fast, "dbo_users")
TdsCdc.subscribe(TdsCdc.Slow, "dbo_orders")Each instance tracks its own LSN position independently.
Gap detection
SQL Server purges old CDC data based on the configured retention period (default: 3 days). If a client falls behind the oldest available change data, TdsCdc detects the gap and:
-
Sends
{:tds_cdc_gap_detected, capture_instance, old_lsn, min_lsn}to all subscribers - Logs a warning
-
Resets the position to the current
min_lsnand continues from there
receive do
{:tds_cdc_gap_detected, ci, old_lsn, min_lsn} ->
Logger.warning("Data lost in #{ci} between #{inspect(old_lsn)} and #{inspect(min_lsn)}")
endChange struct
%TdsCdc.Change{
capture_instance: "dbo_users",
operation: :insert, # :insert | :update | :delete
data: %{id: 1, name: "Alice", email: "alice@example.com"},
lsn: <<0, 0, 0, 42, 0, 0, 11, 128, 0, 82>>,
lsn_prev: nil,
seqval: <<0, 0, 0, 42, 0, 0, 11, 128, 0, 83>>,
commit_lsn: nil,
transaction_order: nil
}Note on UPDATE operations: CDC records operation=3 (before image) and operation=4 (after image). Both are mapped to
:update.
Architecture
When you enable CDC on a table (e.g. dbo.users), SQL Server creates a change table named cdc.<schema>_<table>_CT (e.g. cdc.dbo_users_CT). The SQL Server Agent (CDC capture job) reads the transaction log and populates these _CT tables with every INSERT, UPDATE, and DELETE as they occur. Each _CT row includes metadata columns (__$operation, __$start_lsn, __$seqval) plus all the tracked table's columns.
TdsCdc periodically queries these _CT tables using the sys.fn_cdc_get_all_changes_* function, starting from the last LSN it processed. It advances the LSN position after each poll so no change is delivered twice.
SQL Server Elixir - TdsCdc
┌────────────┐ ┌─────────────┐ ┌───────────────────────────┐
│ dbo.users. │───►│ Transaction │ │ TdsCdc.Client (GenServer) │
│ dbo.orders │ │ Log │ │ │
└────────────┘ └────┬────────┘ │ Connection Adapter │
│ │ ┌──────────────────────┐ │
▼ │ │ TdsCdc.Connection.Tds│ │
┌────────────┐ │ │ -or- │ │
│ CDC Agent │ │ │ TdsCdc.Connection. │ │
│ (sqlagent) │ │ │ Ecto │ │
└─────┬──────┘ │ └──────────────────────┘ │
│ │ │
▼ │ :poll ───► fetch_changes │
┌───────────────────┐ │ ───► %Change{} │
│ cdc.dbo_users_CT │ │ ───► send to subs │
│ cdc.dbo_orders_CT │ │ │
└───────────────────┘ │ lsn_positions tracker │
│ subscribers registry │
└─────────────┬─────────────┘
│ send/2
▼
┌──────────┐
│ App │
│ Consumer │
└──────────┘
Structured listener with use TdsCdc.Listener
For a more structured approach, use the TdsCdc.Listener behaviour. It auto-starts a CDC client, subscribes, and dispatches changes to your callbacks:
defmodule MyApp.CdcListener do
use TdsCdc.Listener
@impl true
def on_init(_opts) do
{:ok, %{inserts: 0, updates: 0, deletes: 0}}
end
@impl true
def on_insert(change, state) do
IO.puts("New record: #{inspect(change.data)}")
{:ok, %{state | inserts: state.inserts + 1}}
end
@impl true
def on_update(change, state) do
IO.puts("Updated: #{inspect(change.data)}")
{:ok, %{state | updates: state.updates + 1}}
end
@impl true
def on_delete(change, state) do
IO.puts("Deleted: #{inspect(change.data)}")
{:ok, %{state | deletes: state.deletes + 1}}
end
@impl true
def on_gap(ci, old_lsn, min_lsn, state) do
Logger.warning("Gap detected in #{ci}")
{:ok, state}
end
endAdd to your supervision tree:
children = [
{MyApp.CdcListener, conn: [hostname: "localhost", ...], capture_instances: ["dbo_users"]}
]
All callbacks are optional and have default implementations. Return {:ok, state} to continue or {:stop, reason} to stop the listener.
Docker example
cd example
docker compose up --buildSpins up SQL Server 2022 with CDC enabled, a web app with CRUD for users on port 4000, and a listener that prints CDC changes in real time.
- Web app: http://localhost:4000 — create, edit, and delete users
- Listener — prints INSERT/UPDATE/DELETE events captured via CDC
Environment variables
| Variable | Default | Description |
|---|---|---|
TDS_HOST | localhost | SQL Server host |
TDS_PORT | 1433 | SQL Server port |
TDS_USERNAME | sa | Username |
TDS_PASSWORD | YourStrong!Passw0rd | Password |
TDS_DATABASE | cdc_example | Database name |
Modules
| Module | Description |
|---|---|
TdsCdc | Public API (start_link, subscribe, unsubscribe, cdc_enabled?, list_capture_instances) |
TdsCdc.Client | GenServer that manages connection and polling |
TdsCdc.Listener | Behaviour for structured CDC event listeners |
TdsCdc.Connection | Behaviour for database connection adapters |
TdsCdc.Connection.Tds | Direct TDS connection adapter (default) |
TdsCdc.Connection.Ecto | Ecto.Repo connection adapter |
TdsCdc.Change | Struct representing a change event |
TdsCdc.Lsn | Utilities for Log Sequence Numbers |
TdsCdc.Persistence | Behaviour for LSN position persistence |
TdsCdc.Persistence.File | Default file-based LSN persistence |
License
MIT