DustEcto

Ecto-shaped facade over Dust. Use Ecto.Schema, Ecto.Changeset, and a Repo-like module to talk to a Dust store from Phoenix apps without writing a custom HTTP client.

defmodule MyApp.Reading.Link do
use DustEcto.Schema,
prefix: ["links"],
required: [:slug, :title, :url]
embedded_schema do
field :title, :string
field :url, :string
field :note, :string
end
def changeset(link, attrs) do
link
|> cast(attrs, [:slug, :title, :url, :note])
|> validate_required(__dust_required_fields__())
|> validate_dust_slug(:slug)
end
end
{:ok, link} =
%MyApp.Reading.Link{}
|> MyApp.Reading.Link.changeset(%{slug: "dust", title: "Dust", url: "https://dustlayer.io"})
|> DustEcto.Repo.insert()
{:ok, [%MyApp.Reading.Link{} | _]} = DustEcto.Repo.all(MyApp.Reading.Link)

DustEcto.Repo is not an Ecto.Repo. It's a deliberately small surface that maps cleanly onto Dust's KV model. The parts that don't map (where, from, preload, transaction) aren't there. See Limitations.


Quick start

# mix.exs
def deps do
[
{:dustlayer_ecto, "~> 0.1"}
]
end
# config/runtime.exs
config :dustlayer_ecto,
store: System.get_env("DUST_STORE") || "myorg/mystore",
base_url: System.get_env("DUST_BASE_URL") || "https://dustlayer.io",
token: System.fetch_env!("DUST_TOKEN")
# lib/my_app/reading/link.ex
defmodule MyApp.Reading.Link do
use DustEcto.Schema, prefix: ["links"], required: [:slug, :title]
embedded_schema do
field :title, :string
field :note, :string
end
def changeset(link, attrs) do
link
|> cast(attrs, [:slug, :title, :note])
|> validate_required(__dust_required_fields__())
|> validate_dust_slug(:slug)
end
end
# in IEx or a context module
alias DustEcto.Repo
alias MyApp.Reading.Link
%Link{} |> Link.changeset(%{slug: "hello", title: "Hello"}) |> Repo.insert()
{:ok, [link]} = Repo.all(Link)
Repo.delete(Link, "hello")

That's a working installation against the deployed dustlayer.io. No supervision tree, no migrations. Realtime subscriptions need extra setup — see Subscribe.

For hot-path reads in a Phoenix app, prefer SDK mode: run your Dust supervisor, configure config :dustlayer_ecto, :dust_facade, MyApp.Dust, and let reads come from the local cache. The minimal HTTP configuration above is best for scripts, release tasks, and low-frequency stateless access.


Storage modes — :flat (default) vs :map

The single most important configuration choice. Pick :flat unless you know you want :map.

:flat (default):map
Wire shapeN leaves at <prefix>.<slug>.<field>One value at <prefix>.<slug>
Writes per recordN PUTs1 PUT
Atomic?No (partial state observable mid-write)Yes (one revision per record)
Multi-writer composabilityYes — other clients edit one field without knowing the restNo — any external write to a field races a :map write that clobbers the whole record
CAS granularityPer leaf (use batch_write/1)Per record (use :if_match on update/2)

Storage diagram for a record MyApp.Reading.Link{slug: "foo", title: "Foo", url: "https://foo"}:

:flat (default) :map
───────────────── ─────────────────
links.foo.title "Foo" links.foo {title: "Foo",
links.foo.url "..." url: "..."}

When to pick :flat:

When to pick :map:

Reads work identically in both modes. Repo.get/2 GETs the slug path and the server returns the assembled value either way.


Transport detection

Two transports ship: DustEcto.Transport.SDK (recommended; uses Dust.Supervisor for realtime + local cache) and DustEcto.Transport.HTTP (Req-based, stateless, no realtime).

DustEcto.Transport.pick/0 runs on every Repo call. Selection order:

  1. Explicit config :dustlayer_ecto, :dust_facade, MyApp.Dust → SDK mode.
  2. Dust.SyncEngineRegistry has the configured store running → SDK mode using the global Dust facade.
  3. Otherwise → HTTP mode.

To verify which transport is active:

{transport, _config} = DustEcto.Transport.pick()
# => {DustEcto.Transport.HTTP, %{...}} | {DustEcto.Transport.SDK, %{...}}

The check is cheap (one or two ETS lookups), so starting Dust.Supervisor at runtime promotes you from HTTP to SDK with no code change.


Repo surface

all/1 stream/1 get/2 get!/2
exists?/2 insert/1 update/1,2 delete/1,2,3
delete_all/1 batch_write/1 subscribe/2 subscribe_raw/2
unsubscribe/1

All write functions return {:ok, struct} | {:error, %Ecto.Changeset{} | %DustEcto.Error{}}. Reads return {:ok, term} | {:error, :not_found | %DustEcto.Error{}}.


Error handling

All transport-level failures land as %DustEcto.Error{kind, detail, retryable?}. Pattern-match on :kind to decide what to do:

case Repo.insert(cs) do
{:ok, struct} -> ...
{:error, %Ecto.Changeset{} = cs} -> # validation failed
{:error, %DustEcto.Error{kind: :conflict}} -> # CAS lost the race
{:error, %DustEcto.Error{kind: :rate_limited, detail: %{retry_after: s}}} ->
# back off s seconds and retry
{:error, %DustEcto.Error{kind: :not_implemented}} ->
# deployed server doesn't expose this op — likely a deploy lag
{:error, %DustEcto.Error{retryable?: true}} -> # transient — retry
{:error, %DustEcto.Error{}} -> # bail
end
kindWhen you'll see it
:networkReq call failed before reaching the server (DNS, TLS, refused). Retryable.
:httpUnrecognized non-2xx status. 5xx is retryable, 4xx isn't.
:conflictIf-Match precondition failed. detail has current_revision.
:not_supportedFeature unavailable on the active transport (e.g. subscribe in HTTP mode).
:not_implementedServer returned 404 on a whole route — the deployed server is older than dustlayer_ecto expects.
:nothing_to_writeinsert/update had no fields to send. Usually a bug in the caller's changeset.
:timeoutSDK write didn't get an ack in time. Don't blind-retry; the write may still land.
:unauthorizedToken rejected.
:invalid_paramsServer rejected the request shape (other than 404).
:rate_limited429. detail.retry_after carries the header. Retryable.

CAS — :if_match

Optimistic concurrency on writes. The server enforces leaf-only CAS, so the semantics depend on storage mode:

:map mode — single PUT, single revision per record:

{:ok, entry} = DustEcto.Transport.HTTP.get(store, "links.foo")
# entry.revision is the current server revision
cs = Link.changeset(link, %{title: "new"})
case Repo.update(cs, if_match: entry.revision) do
{:ok, _} -> :saved
{:error, %DustEcto.Error{kind: :conflict}} -> :reload_and_retry
end

:map mode delete:

Repo.delete(Link, "foo", if_match: 7)
# or
Repo.delete(%Link{slug: "foo"}, if_match: 7)

:flat mode:update/2 with if_match:raises — there's no single revision to compare against. For atomic multi-field CAS in :flat mode, use batch_write/1:

Repo.batch_write([
{:update, link1_cs, if_match: 5},
{:update, link2_cs, if_match: 9}
])
# committed atomically server-side; if any if_match fails, none lands

Atomic multi-record writes — batch_write/1

Repo.batch_write([
{:insert, Link.changeset(%Link{}, attrs1)},
{:insert, Link.changeset(%Link{}, attrs2)},
{:update, existing_link_cs, if_match: 7},
{:delete, Link, "stale-slug"},
{:delete, Link, "old", if_match: 4}
])

Validates each changeset short-circuit-style — if any fails, {:error, %Ecto.Changeset{}} and nothing is sent. Otherwise the whole batch commits atomically server-side.

In :flat mode, each insert/update expands to N wire ops (one per non-nil field). :if_match on a :flat op raises — per-field CAS needs per-field revisions, which v1 doesn't surface.


Subscribe

Realtime subscriptions are only available when the SDK transport is active — i.e. Dust.Supervisor is in your supervision tree. From HTTP mode, Repo.subscribe/2 returns {:error, %DustEcto.Error{kind: :not_supported}}.

Setting up the SDK supervisor

# lib/my_app/dust.ex
defmodule MyApp.Dust do
use Dust, otp_app: :my_app
end
# config/runtime.exs
config :my_app, MyApp.Dust,
stores: ["myorg/mystore"],
repo: MyApp.Repo
config :dustlayer_ecto, :dust_facade, MyApp.Dust
# lib/my_app/application.ex
children = [
MyApp.Repo,
MyApp.Dust, # ← add this
MyAppWeb.Endpoint
]

If you're in a Phoenix app, use the PubSub bridge — one line in mount/3, no callback discipline to remember, automatic cleanup:

defmodule MyAppWeb.LinksLive do
use MyAppWeb, :live_view
alias MyApp.Reading.Link
def mount(_, _, socket) do
if connected?(socket) do
:ok = DustEcto.Phoenix.subscribe_to_pubsub(Link, MyApp.PubSub, "links")
end
{:ok, assign(socket, links: load_links())}
end
def handle_info({:dust_event, {:upserted, %Link{} = link}}, socket),
do: {:noreply, update(socket, :links, &upsert_by_slug(&1, link))}
def handle_info({:dust_event, {:deleted, slug}}, socket),
do: {:noreply, update(socket, :links, &delete_by_slug(&1, slug))}
end

Add {:phoenix_pubsub, "~> 2.0"} to your deps (most Phoenix projects already have it). No terminate/2 cleanup — Phoenix.PubSub monitors subscribers and unsubscribes automatically. The bridge starts one shared broadcaster per topic so 100 LiveViews subscribed to "links" cost one Dust subscription, not 100.

Raw Repo.subscribe/2

If you can't use Phoenix.PubSub (release script, non-Phoenix app, custom fan-out), drop down to Repo.subscribe/2 directly:

{:ok, ref} =
DustEcto.Repo.subscribe(Link, fn
{:upserted, %Link{} = link} -> handle_upsert(link)
{:deleted, slug} -> handle_delete(slug)
end)
# later
DustEcto.Repo.unsubscribe(ref)

The callback runs inside the SDK's per-store sync engine process. If it blocks, every subscriber on that store waits. The standard safe pattern is to send a message and return immediately:

pid = self()
{:ok, _ref} =
DustEcto.Repo.subscribe(Link, fn event ->
send(pid, {:link, event})
:ok
end)

If pid dies without unsubscribing, the SDK registry keeps the callback and send/2s into a dead pid for every subsequent write. Track the ref and Repo.unsubscribe/1 it on shutdown. This is exactly the bookkeeping the PubSub bridge eliminates.

subscribe_raw/2 is the lower-level escape hatch — callback receives the raw event map %{op:, path:, value:, store_seq:, ...} instead of the assembled struct. Useful for provenance or custom assembly.


Migrating from a hand-rolled client

If you've already built a thin wrapper around the Dust HTTP API (Client, Schema, Repo modules of your own), the mapping is mechanical:

Hand-rolledDustEcto
MyApp.Dust.ClientDelete entirely — DustEcto.Transport.HTTP replaces it.
use MyApp.Dust.Schema, prefix: "foo"use DustEcto.Schema, prefix: ["foo"], required: [...]
MyApp.Dust.Repo.all/get/insert/updateDustEcto.Repo.all/get/insert/update (1-for-1)
MyApp.Dust.Repo.soft_delete (null-PUT workaround)DustEcto.Repo.delete/2 (real delete; needs Dust server ≥ 0.1)
{:error, {:http, status, body}} tuples{:error, %DustEcto.Error{}} — pattern-match on :kind

Config rename: whatever app key you used (:my_app, MyApp.Dust) becomes :dustlayer_ecto directly.


Limitations

Not supportedWhy / workaround
Ecto.Query (where, from, join, preload)Dust is KV, not relational. Filter in Elixir after Repo.all/1, or use a prefix-shaped key design.
insert_all/2Use batch_write/1 with a list of {:insert, cs} ops.
transaction/1Use batch_write/1 for atomic multi-record commits.
Repo.insert/1 insert-or-fail semanticsDust writes are upserts. If you need fail-on-duplicate, Repo.exists?/2 first and accept that another writer can race you.
Per-field CAS in :flat mode update/2Use batch_write/1 with per-op :if_match.

Environment variables

Config keys (under :dustlayer_ecto):

KeyDefaultWhere to get it
:tokenrequiredThe store API token. Create one at the Dust dashboard. Secret — keep it out of repo.
:storerequiredThe Dust store name as org/name.
:base_urlhttps://dustlayer.ioOverride only for self-hosted Dust or a staging instance.

Minimum config:

config :dustlayer_ecto,
store: System.fetch_env!("DUST_STORE"),
token: System.fetch_env!("DUST_TOKEN")

If you're hitting a non-default Dust host:

config :dustlayer_ecto,
store: System.fetch_env!("DUST_STORE"),
token: System.fetch_env!("DUST_TOKEN"),
base_url: System.fetch_env!("DUST_BASE_URL")

Config changes need a server restart in dev — Phoenix's code reload doesn't reread Application.put_env from .env files.


Server compatibility

dustlayer_ectoRequired dust server
0.1.x0.1.x (DELETE and batch_write routes). Older servers will surface %DustEcto.Error{kind: :not_implemented} on those calls.

The deployed instance at dustlayer.io tracks the latest released server. If you self-host, mind the matrix.


License

MIT.