DuckFeeder πŸ¦†

Hex.pmHex Docs

Stream every change from your Postgres database into a DuckDB-queryable lakehouse β€” with one Elixir module.

DuckFeeder connects to Postgres logical replication (WAL/CDC), writes Parquet files to object storage (S3/GCS), and commits metadata to DuckLake β€” all inside your OTP supervision tree. The result is a continuously-updated analytic copy of your production data that DuckDB can query directly, with no ETL pipelines, no Kafka, and no Spark jobs.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Postgres    β”‚  WAL  β”‚  DuckFeeder  β”‚ .parq β”‚    S3/GCS    β”‚ read  β”‚   DuckDB     β”‚
β”‚  (source)    │──────▢│  (Elixir)    │──────▢│  (storage)   │◀──────│  (analytics) β”‚
β”‚              β”‚       β”‚              │──┐    β”‚              β”‚       β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                         β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                         └───▢│  Postgres    β”‚
                                     metadata β”‚  (DuckLake)  β”‚
                                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Why?

Most teams eventually need analytics on their production data. The usual path is a warehouse, an ETL tool, and a pipeline to keep them in sync. DuckFeeder collapses that into a library you add to your existing Elixir app:


Quick start

1. Migration

defmodule MyApp.Repo.Migrations.AddDuckFeeder do
  use Ecto.Migration

  def up, do: DuckFeeder.Migrations.up(repo: repo())
  def down, do: DuckFeeder.Migrations.down(repo: repo())
end

2. Config

# config/runtime.exs
config :my_app, MyApp.DuckFeeder,
  enabled: System.get_env("DUCK_FEEDER_ENABLED") == "true",
  repo: MyApp.Repo,
  schemas: [MyApp.Users, MyApp.Orders, MyApp.Products],
  storage: %{
    provider: :s3,
    bucket: System.fetch_env!("DUCK_FEEDER_BUCKET"),
    access_key_id: System.fetch_env!("AWS_ACCESS_KEY_ID"),
    secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY")
  }

3. Runtime module

defmodule MyApp.DuckFeeder do
  use DuckFeeder.Runtime, otp_app: :my_app
end

4. Supervise it

# application.ex
children = [
  MyApp.Repo,
  MyAppWeb.Endpoint,
  MyApp.DuckFeeder
]

That's it. DuckFeeder will create the replication slot, start streaming WAL changes, write Parquet batches to S3, and commit DuckLake metadata β€” all automatically.


Query with DuckDB

The whole point is making your data queryable. Once DuckFeeder is running, open a DuckDB session and attach the lakehouse:

-- Connect DuckDB to your DuckLake metadata
ATTACH 'ducklake:postgres:host=localhost dbname=my_app_dev' AS lake;

-- Browse what's there
SHOW ALL TABLES;

-- Query CDC data directly β€” DuckDB reads Parquet from S3 automatically
SELECT
    user_id,
    count(*) AS order_count,
    sum(total_cents) / 100.0 AS revenue
FROM lake.raw.orders
WHERE _df_op = 'insert'
GROUP BY user_id
ORDER BY revenue DESC
LIMIT 10;

-- Time-travel: see the state at a previous snapshot
FROM ducklake_snapshot_at(lake, 42, 'raw', 'orders')
SELECT count(*);

-- Changelog queries β€” reconstruct what changed and when
SELECT _df_op, _df_timestamp, id, status
FROM lake.raw.orders
WHERE id = 1234
ORDER BY _df_timestamp;

Every row includes CDC metadata columns (_df_op, _df_timestamp, _df_lsn) so you can distinguish inserts from updates from deletes, reconstruct change history, and build incremental materializations.


Append streams (non-CDC events)

Push telemetry, audit logs, or domain events through the same pipeline β€” no CDC required:

# Start an append stream for custom events
{:ok, stream} = DuckFeeder.start_append_stream(
  designated_tables: [%{id: 1, target_schema: "raw", target_table: "app_events"}],
  meta_conn: meta_conn,
  storage: storage_config
)

# Push events from anywhere in your app
DuckFeeder.append_event(stream, "app_events", %{
  "type" => "page_view",
  "path" => "/dashboard",
  "user_id" => user_id,
  "at" => DateTime.utc_now()
})

Then query them the same way:

SELECT date_trunc('hour', at) AS hour, count(*) AS views
FROM lake.raw.app_events
WHERE type = 'page_view'
GROUP BY 1
ORDER BY 1;

Safe telemetry forwarding

Forward Phoenix/Ecto telemetry events without recursive ingestion loops:

DuckFeeder.start_telemetry_forwarder(
  stream: stream,
  table: "app_events",
  events: [
    [:phoenix, :endpoint, :stop],
    [:ecto, :repo, :query]
  ],
  summarize_duck_feeder?: true
)

How it works

Postgres WAL ──▢ CDC.Connection ──▢ Service ──▢ TablePipeline(s)
                                                       β”‚
                                               flush batches
                                                       β”‚
                                                       β–Ό
                                               BatchProcessor
                                          (encode β†’ upload β†’ commit)
                                                       β”‚
                                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                    β–Ό                  β–Ό                  β–Ό
                              Write Parquet     Upload to S3/GCS   Commit DuckLake
                              (Rust NIF)                           metadata to PG
                                                                         β”‚
                                                                         β–Ό
                                                              Checkpoint LSN in PG
                                                                         β”‚
                                                                         β–Ό
                                                              Ack to CDC.Connection

Key design choices:

If you need to build the NIF from source (for local development or unsupported targets), set DUCK_FEEDER_BUILD_NIF=1.

For maintainers: precompiled releases require publishing NIF artifacts (see .github/workflows/build_precompiled_nifs.yml) and including generated checksum-*.exs files in the Hex package (mix rustler_precompiled.download DuckFeeder.Writer.ParquetNif --all --print).


Schema inference

DuckFeeder infers table configuration from your Ecto schemas:

Inferred from Used for
__schema__(:source) Source table name
__schema__(:prefix) Source schema (default: "public")
__schema__(:primary_key) Primary key columns

Override anything per-schema:

schemas: [
  MyApp.Users,
  {MyApp.Orders, target_table: "order_events", target_schema: "analytics"},
  {MyApp.InternalAudit, enabled?: false}
]

Production configuration

config :my_app, MyApp.DuckFeeder,
  repo: MyApp.Repo,
  schemas: [MyApp.Users, MyApp.Orders],
  storage: %{
    provider: :s3,
    bucket: "my-data-lake",
    access_key_id: System.fetch_env!("AWS_ACCESS_KEY_ID"),
    secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY")
  },
  # Replication slot/publication names (auto-generated if omitted)
  slot_name: "duck_feeder_prod_slot",
  publication_name: "duck_feeder_prod_pub",
  # Backpressure β€” effectively required for bounded WAL retention
  runtime_opts: [
    max_lag_bytes: 128 * 1024 * 1024,
    backpressure_lag_bytes: 64 * 1024 * 1024
  ]

Migration ordering

If you rely on schema-change directives, deploy DuckFeeder first, then run migrations:

  1. Deploy with DuckFeeder enabled.
  2. Confirm replication is live.
  3. Run Ecto migrations.

Advanced APIs

Most apps should use the DuckFeeder.Runtime wrapper above. For custom orchestration, see module docs for:


Status

End-to-end architecture is implemented: Postgres CDC β†’ Parquet β†’ S3/GCS β†’ DuckLake metadata β†’ DuckDB queries.

See docs/current_status.md for the detailed roadmap.

License

See LICENSE file.